package main import ( "context" "dsSupport/Utils/CommonUtil" "dsSupport/Utils/DbUtil" "dsSupport/Utils/PgUtil" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "github.com/tidwall/gjson" "log" "os" "time" ) // 深度分页 // https://www.cnblogs.com/hello-shf/p/11543453.html //MYSQL数据连接器,用于读取元数据信息 var mysqlDb = DbUtil.Engine //GreenPlum数据连接器,用于写入、修改GPDB的数据 var pgDb = PgUtil.Engine //用于批量一次性插入的数据集合 var insertStrArray = make([]string, 0) //用于批量更新的数据集合,一般是业务主键ID集合,修改enable_falg=0 var pkStrArray = make([]string, 0) //缓存表的结构 type CacheFieldStruct struct { cacheFieldKeys []string tableId int } var CacheFieldBean CacheFieldStruct func main() { //TODO //需要将ES的连接信息修改到配置文件中,目前想来应该是数据交换平台的概念,需要在dsDataex项目中实现,待迁移 var host = "http://10.10.14.188:9200/" esClient, err := elastic.NewClient( elastic.SetURL(host), elastic.SetSniff(false), elastic.SetHealthcheckInterval(10*time.Second), elastic.SetGzip(true), elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), ) if err != nil { //如果是集成到dsDataex项目中,以协程方式启动,这里就不有使用painc(err)会异常会导致程序退出 panic(err) } //从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持 sql := "select * from t_dw_table where b_use=1" tableList, _ := mysqlDb.SQL(sql).Query().List() for i := range tableList { tableId := tableList[i]["table_id"].(int64) indexName := tableList[i]["table_name"].(string) //预热数据表的列名 getFields(int(tableId)) // 当前表的主键是什么,目前只支持单业务主键,复合主键的不支持 sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1` list, _ := mysqlDb.SQL(sql, tableId).Query().List() pk := list[0]["field_name"].(string) //取所有 CTX := context.Background() result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX) if err != nil { panic(err) } //第一次的结果集 for i := range result.Hits.Hits { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") addRecord(pk, value.String()) } //批量执行 batchSave(indexName, pk) //继续用的 scoll_id scrollId := result.ScrollId //第一次命中的个数 var nowCount = int64(len(result.Hits.Hits)) //总的命中个数 allCount := result.TotalHits() //开始循环 for { //清空 insertStrArray = insertStrArray[0:0] pkStrArray = pkStrArray[0:0] //如果还有数据没有获取 if allCount > nowCount { result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) scrollId = result.ScrollId nowCount += int64(len(result.Hits.Hits)) for i := range result.Hits.Hits { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") addRecord(pk, value.String()) } } else { //没有数据了 break } //批量执行 batchSave(indexName, pk) } fmt.Println("索引"+indexName+"同步完成!") } } /** 功能:获取指定表的字段有哪些,应该是一个稳定的排序,因为后面要用到 作者:黄海 时间:2020-09-07 */ func getFields(tableId int) { //清空 CacheFieldBean.cacheFieldKeys = CacheFieldBean.cacheFieldKeys[:] //填充 sql := `select field_name from t_dw_table_field where table_id=? order by sort_id,field_name` list, _ := mysqlDb.SQL(sql, tableId).Query().List() for i := range list { fieldName := list[i]["field_name"].(string) CacheFieldBean.cacheFieldKeys = append(CacheFieldBean.cacheFieldKeys, fieldName) } CacheFieldBean.tableId = tableId } //后续优化为到达一定的阀值再提交一次,初步定为200一次 func addRecord(pk string, jsonStr string) { var mymap map[string]interface{} if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { //组装pkArray,用来update enable_flag switch (mymap[pk]).(type) { case string: pkStrArray = append(pkStrArray, mymap[pk].(string)) break case float64: pkStrArray = append(pkStrArray, CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))) break default: break } //另一个任务是组装insertStrArray,准备批量提交 var lineSql = "" //根据key从m中拿元素,就是按顺序拿了 for _, k := range CacheFieldBean.cacheFieldKeys { switch mymap[k].(type) { case string: lineSql += "'" + mymap[k].(string) + "'," break case int64: lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + "," break case float64: lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + "," break default: lineSql += "'-1'," //进入这里一般是有这个字段,但数据源中没有这列数据,按-1填充吧~ break } } lineSql += "'" + CommonUtil.GetUUID() + "'" insertStrArray = append(insertStrArray, lineSql) } else { fmt.Println(err.Error()) } } //提交 func batchSave(tableName string, pkName string) { var pkStr = "" //数组去重 pkStrArray = CommonUtil.RemoveRepeatedElement(pkStrArray) for i := range pkStrArray { pkStr += "'" + pkStrArray[i] + "'," } pkStr = pkStr[0 : len(pkStr)-1] sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr + ") and enable_flag=1" pgDb.SQL(sql).Execute() //插入 sql = `insert into ` + tableName + "(" //根据key从m中拿元素,就是按顺序拿了 for _, k := range CacheFieldBean.cacheFieldKeys { sql += k + "," } sql += "uuid" var lineSql = "" for i := range insertStrArray { lineSql += "(" + insertStrArray[i] + ")," } lineSql = lineSql[0 : len(lineSql)-1] sql += ") values " + lineSql + ";" _, err := pgDb.SQL(sql).Execute() if err != nil { fmt.Println(sql) panic(err) } fmt.Println("批量执行" + CommonUtil.ConvertIntToString(len(insertStrArray)) + "条.") }