You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
6.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"context"
"dsSupport/Utils/CommonUtil"
"dsSupport/Utils/DbUtil"
"dsSupport/Utils/PgUtil"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"github.com/tidwall/gjson"
"log"
"os"
"time"
)
// 深度分页
// https://www.cnblogs.com/hello-shf/p/11543453.html
//MYSQL数据连接器用于读取元数据信息
var mysqlDb = DbUtil.Engine
//GreenPlum数据连接器用于写入、修改GPDB的数据
var pgDb = PgUtil.Engine
//用于批量一次性插入的数据集合
var insertStrArray = make([]string, 0)
//用于批量更新的数据集合一般是业务主键ID集合修改enable_falg=0
var pkStrArray = make([]string, 0)
//缓存表的结构
var cacheFieldKeys []string
func main() {
//TODO
//需要将ES的连接信息修改到配置文件中,目前想来应该是数据交换平台的概念需要在dsDataex项目中实现待迁移
var host = "http://10.10.14.188:9200/"
esClient, err := elastic.NewClient(
elastic.SetURL(host),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10*time.Second),
elastic.SetGzip(true),
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
)
if err != nil {
//如果是集成到dsDataex项目中以协程方式启动这里就不有使用painc(err)会异常会导致程序退出
panic(err)
}
//从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持
sql := "select * from t_dw_table where b_use=1"
tableList, _ := mysqlDb.SQL(sql).Query().List()
for i := range tableList {
tableId := tableList[i]["table_id"].(int64)
indexName := tableList[i]["table_name"].(string)
//预热数据表的列名
getFields(int(tableId))
//清空重新导入
sql = "truncate table " + indexName
pgDb.SQL(sql).Execute()
// 当前表的主键是什么,目前只支持单业务主键,复合主键的不支持
sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1`
list, _ := mysqlDb.SQL(sql, tableId).Query().List()
pk := list[0]["field_name"].(string)
//取所有
CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX)
if err != nil {
panic(err)
}
//第一次的结果集
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
addRecord(pk, value.String())
}
//批量执行
batchSave(indexName, pk)
//继续用的 scoll_id
scrollId := result.ScrollId
//第一次命中的个数
var nowCount = int64(len(result.Hits.Hits))
//总的命中个数
allCount := result.TotalHits()
//开始循环
for {
//清空
insertStrArray = insertStrArray[0:0]
pkStrArray = pkStrArray[0:0]
//如果还有数据没有获取
if allCount > nowCount {
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
scrollId = result.ScrollId
nowCount += int64(len(result.Hits.Hits))
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
addRecord(pk, value.String())
}
} else {
//没有数据了
break
}
//批量执行
batchSave(indexName, pk)
}
fmt.Println("索引" + indexName + "同步完成!")
}
}
/**
功能:获取指定表的字段有哪些,应该是一个稳定的排序,因为后面要用到
作者:黄海
时间2020-09-07
*/
func getFields(tableId int) {
//清空
cacheFieldKeys = cacheFieldKeys[0:0]
//填充
sql := `select field_name from t_dw_table_field where table_id=? order by sort_id,field_name`
list, _ := mysqlDb.SQL(sql, tableId).Query().List()
for i := range list {
fieldName := list[i]["field_name"].(string)
cacheFieldKeys = append(cacheFieldKeys, fieldName)
}
}
//后续优化为到达一定的阀值再提交一次初步定为200一次
func addRecord(pk string, jsonStr string) {
var mymap map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil {
//组装pkArray,用来update enable_flag
switch (mymap[pk]).(type) {
case string:
pkStrArray = append(pkStrArray, mymap[pk].(string))
break
case float64:
pkStrArray = append(pkStrArray, CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64))))
break
default:
break
}
//另一个任务是组装insertStrArray,准备批量提交
var lineSql = ""
//根据key从m中拿元素就是按顺序拿了
for _, k := range cacheFieldKeys {
switch mymap[k].(type) {
case string:
lineSql += "'" + mymap[k].(string) + "',"
break
case int64:
lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + ","
break
case float64:
lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + ","
break
default:
lineSql += "'-1'," //进入这里一般是有这个字段,但数据源中没有这列数据,按-1填充吧~
break
}
}
lineSql += "'" + CommonUtil.GetUUID() + "'"
insertStrArray = append(insertStrArray, lineSql)
} else {
fmt.Println(err.Error())
}
}
//提交
func batchSave(tableName string, pkName string) {
var pkStr = ""
//数组去重
pkStrArray = CommonUtil.RemoveRepeatedElement(pkStrArray)
for i := range pkStrArray {
pkStr += "'" + pkStrArray[i] + "',"
}
pkStr = pkStr[0 : len(pkStr)-1]
sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr + ") and enable_flag=1"
pgDb.SQL(sql).Execute()
//插入
sql = `insert into ` + tableName + "("
//根据key从m中拿元素就是按顺序拿了
for _, k := range cacheFieldKeys {
sql += k + ","
}
sql += "uuid"
var lineSql = ""
for i := range insertStrArray {
lineSql += "(" + insertStrArray[i] + "),"
}
lineSql = lineSql[0 : len(lineSql)-1]
sql += ") values " + lineSql + ";"
_, err := pgDb.SQL(sql).Execute()
if err != nil {
fmt.Println(sql)
panic(err)
}
fmt.Println("批量执行" + CommonUtil.ConvertIntToString(len(insertStrArray)) + "条.")
}