From 52fa055c3509b6d21fa93c17cf5134531b19a815 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Mon, 7 Sep 2020 08:22:28 +0800 Subject: [PATCH] 'commit' --- .../ElasticsearchToGreenPlum.go | 175 ++++++++++-------- 1 file changed, 100 insertions(+), 75 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 8a82c369..e6200541 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -11,24 +11,35 @@ import ( "github.com/tidwall/gjson" "log" "os" - "sort" "time" ) // 深度分页 -// TODO -// 后续需要记录每个INDEX的最后同步时间,形成日志文件,现在是一次性全量同步 -// 因目前是以业务主键ID保持,所以比ES的数据量肯定是要少的,因为更新的日志记录都没有同步到数仓,只是最终有效的业务ID同步过来。 // https://www.cnblogs.com/hello-shf/p/11543453.html -var client elastic.Client +//MYSQL数据连接器,用于读取元数据信息 var mysqlDb = DbUtil.Engine + +//GreenPlum数据连接器,用于写入、修改GPDB的数据 var pgDb = PgUtil.Engine + +//用于批量一次性插入的数据集合 var insertStrArray = make([]string, 0) + +//用于批量更新的数据集合,一般是业务主键ID集合,修改enable_falg=0 var pkStrArray = make([]string, 0) -var keyMap = make(map[string]int, 0) + +//缓存表的结构 +type CacheFieldStruct struct { + cacheFieldKeys []string + tableId int +} + +var CacheFieldBean CacheFieldStruct func main() { + //TODO + //需要将ES的连接信息修改到配置文件中,目前想来应该是数据交换平台的概念,需要在dsDataex项目中实现,待迁移 var host = "http://10.10.14.188:9200/" esClient, err := elastic.NewClient( elastic.SetURL(host), @@ -38,74 +49,94 @@ func main() { elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), ) - if err != nil { + //如果是集成到dsDataex项目中,以协程方式启动,这里就不有使用painc(err)会异常会导致程序退出 panic(err) } - - //要同步的索引名称,也就是表名称 - indexName := "org_school" - keys := make([]string, 0) //从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持 - sql := "select * from t_dw_table where table_name=?" - list, _ := mysqlDb.SQL(sql, indexName).Query().List() - tableId := list[0]["table_id"].(int64) - sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1` - list, _ = mysqlDb.SQL(sql, tableId).Query().List() - pk := list[0]["field_name"].(string) - //取所有 - CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX) - if err != nil { - panic(err) - } - //第一次的结果集 - for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i].Source) - resStr := string(resByte) - value := gjson.Get(resStr, "data_content") - keys = addRecord(pk, value.String()) - } - //批量执行 - batchSave(indexName, pk, keys) - - //继续用的 scoll_id - scrollId := result.ScrollId - //第一次命中的个数 - var nowCount = int64(len(result.Hits.Hits)) - //总的命中个数 - allCount := result.TotalHits() - - //开始循环 - for { - //清空 - insertStrArray = insertStrArray[0:0] - pkStrArray = pkStrArray[0:0] - //如果还有数据没有获取 - if allCount > nowCount { - result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) - scrollId = result.ScrollId - nowCount += int64(len(result.Hits.Hits)) - - for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i].Source) - resStr := string(resByte) - value := gjson.Get(resStr, "data_content") - keys = addRecord(pk, value.String()) - } - } else { - //没有数据了 - break + sql := "select * from t_dw_table" + list, _ := mysqlDb.SQL(sql).Query().List() + for i := range list { + tableId := list[i]["table_id"].(int64) + indexName := list[i]["table_name"].(string) + //预热数据表的列名 + getFields(int(tableId)) + // 当前表的主键是什么,目前只支持单业务主键,复合主键的不支持 + sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1` + list, _ = mysqlDb.SQL(sql, tableId).Query().List() + pk := list[0]["field_name"].(string) + //取所有 + CTX := context.Background() + result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX) + if err != nil { + panic(err) + } + //第一次的结果集 + for i := range result.Hits.Hits { + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + resStr := string(resByte) + value := gjson.Get(resStr, "data_content") + addRecord(pk, value.String()) } //批量执行 - batchSave(indexName, pk, keys) + batchSave(indexName, pk) + //继续用的 scoll_id + scrollId := result.ScrollId + //第一次命中的个数 + var nowCount = int64(len(result.Hits.Hits)) + //总的命中个数 + allCount := result.TotalHits() + + //开始循环 + for { + //清空 + insertStrArray = insertStrArray[0:0] + pkStrArray = pkStrArray[0:0] + //如果还有数据没有获取 + if allCount > nowCount { + result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) + scrollId = result.ScrollId + nowCount += int64(len(result.Hits.Hits)) + + for i := range result.Hits.Hits { + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + resStr := string(resByte) + value := gjson.Get(resStr, "data_content") + addRecord(pk, value.String()) + } + } else { + //没有数据了 + break + } + //批量执行 + batchSave(indexName, pk) + } + fmt.Println("索引"+indexName+"同步完成!") } } +/** +功能:获取指定表的字段有哪些,应该是一个稳定的排序,因为后面要用到 +作者:黄海 +时间:2020-09-07 +*/ +func getFields(tableId int) { + //清空 + CacheFieldBean.cacheFieldKeys = CacheFieldBean.cacheFieldKeys[:] + //填充 + sql := `select field_name from t_dw_table_field where table_id=? order by sort_id,field_name` + list, _ := mysqlDb.SQL(sql, tableId).Query().List() + + for i := range list { + fieldName := list[i]["field_name"].(string) + CacheFieldBean.cacheFieldKeys = append(CacheFieldBean.cacheFieldKeys, fieldName) + } + CacheFieldBean.tableId = tableId +} + //后续优化为到达一定的阀值再提交一次,初步定为200一次 -func addRecord(pk string, jsonStr string) []string { +func addRecord(pk string, jsonStr string) { var mymap map[string]interface{} - var keys []string if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { //组装pkArray,用来update enable_flag switch (mymap[pk]).(type) { @@ -119,15 +150,9 @@ func addRecord(pk string, jsonStr string) []string { break } //另一个任务是组装insertStrArray,准备批量提交 - //拿到key - for k := range mymap { - keys = append(keys, k) - } - //对key排序 - sort.Strings(keys) var lineSql = "" //根据key从m中拿元素,就是按顺序拿了 - for _, k := range keys { + for _, k := range CacheFieldBean.cacheFieldKeys { switch mymap[k].(type) { case string: lineSql += "'" + mymap[k].(string) + "'," @@ -139,6 +164,7 @@ func addRecord(pk string, jsonStr string) []string { lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + "," break default: + lineSql += "'-1'," //进入这里一般是有这个字段,但数据源中没有这列数据,按-1填充吧~ break } } @@ -147,11 +173,10 @@ func addRecord(pk string, jsonStr string) []string { } else { fmt.Println(err.Error()) } - return keys } //提交 -func batchSave(tableName string, pkName string, keys []string) { +func batchSave(tableName string, pkName string) { var pkStr = "" //数组去重 pkStrArray = CommonUtil.RemoveRepeatedElement(pkStrArray) @@ -165,7 +190,7 @@ func batchSave(tableName string, pkName string, keys []string) { //插入 sql = `insert into ` + tableName + "(" //根据key从m中拿元素,就是按顺序拿了 - for _, k := range keys { + for _, k := range CacheFieldBean.cacheFieldKeys { sql += k + "," } sql += "uuid" @@ -176,8 +201,8 @@ func batchSave(tableName string, pkName string, keys []string) { } lineSql = lineSql[0 : len(lineSql)-1] sql += ") values " + lineSql + ";" - _,err:=pgDb.SQL(sql).Execute() - if err!=nil{ + _, err := pgDb.SQL(sql).Execute() + if err != nil { fmt.Println(sql) //panic(err) }