diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 8cfe6d59..3cc41826 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -3,6 +3,7 @@ package main import ( "context" "dsSupport/Utils/CommonUtil" + "dsSupport/Utils/DbUtil" "dsSupport/Utils/PgUtil" "encoding/json" "fmt" @@ -21,8 +22,10 @@ import ( // https://www.cnblogs.com/hello-shf/p/11543453.html var client elastic.Client -var db = PgUtil.Engine -var sqls = make([]string, 0) +var mysqlDb = DbUtil.Engine +var pgDb = PgUtil.Engine +var insertStrArray = make([]string, 0) +var pkStrArray = make([]string, 0) var keyMap = make(map[string]int, 0) func main() { @@ -42,26 +45,16 @@ func main() { //要同步的索引名称,也就是表名称 indexName := "org_school" - - sql := `SELECT - pg_constraint.conname AS pk_name, - pg_attribute.attname AS colname, - pg_type.typname AS typename - FROM - pg_constraint - INNER JOIN pg_class ON pg_constraint.conrelid = pg_class.oid - INNER JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid - AND pg_attribute.attnum = pg_constraint.conkey [ 1 ] - INNER JOIN pg_type ON pg_type.oid = pg_attribute.atttypid - WHERE - pg_class.relname = ? - AND pg_constraint.contype = 'p'` - list, _ := db.SQL(sql, indexName).Query().List() - pk := list[0]["colname"].(string) - + //从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持 + sql := "select * from t_dw_table where table_name=?" + list, _ := mysqlDb.SQL(sql, indexName).Query().List() + tableId := list[0]["table_id"].(int64) + sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1` + list, _ = mysqlDb.SQL(sql, tableId).Query().List() + pk := list[0]["field_name"].(string) //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(1000).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(100).Do(CTX) if err != nil { panic(err) } @@ -70,10 +63,10 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(indexName, pk, value.String()) + addRecord(pk, value.String()) } //批量执行 - batchSave(sqls) + batchSave(indexName, pk) //继续用的 scoll_id scrollId := result.ScrollId @@ -85,7 +78,8 @@ func main() { //开始循环 for { //清空 - sqls = sqls[0:0] + insertStrArray = insertStrArray[0:0] + pkStrArray = pkStrArray[0:0] //如果还有数据没有获取 if allCount > nowCount { result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) @@ -96,49 +90,43 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(indexName, pk, value.String()) + addRecord(pk, value.String()) } } else { //没有数据了 break } //批量执行 - batchSave(sqls) + batchSave(indexName, pk) } } //后续优化为到达一定的阀值再提交一次,初步定为200一次 -func addRecord(tableName string, pk string, jsonStr string) { +func addRecord(pk string, jsonStr string) { var mymap map[string]interface{} - var sql = "" if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { - //拿到key - var keys []string - for k := range mymap { - keys = append(keys, k) - } - //对key排序 - sort.Strings(keys) - - //先删除再插入 - sql = `delete from ` + tableName + ` where ` + pk + `=` + //组装pkArray,用来update enable_flag switch (mymap[pk]).(type) { case string: - sqls = append(sqls, sql+"'"+mymap[pk].(string)+"';") + pkStrArray = append(pkStrArray, mymap[pk].(string)) break case float64: - sqls = append(sqls, sql+CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))+";") + pkStrArray = append(pkStrArray, CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))) break default: break } - //插入 - sql = `insert into ` + tableName + "(" + //另一个任务是组装insertStrArray,准备批量提交 + //拿到key + var keys []string + for k := range mymap { + keys = append(keys, k) + } + //对key排序 + sort.Strings(keys) var lineSql = "" - //根据key从m中拿元素,就是按顺序拿了 for _, k := range keys { - sql += k + "," switch mymap[k].(type) { case string: lineSql += "'" + mymap[k].(string) + "'," @@ -154,26 +142,49 @@ func addRecord(tableName string, pk string, jsonStr string) { } } lineSql = lineSql[0 : len(lineSql)-1] - sql = sql[0 : len(sql)-1] - sql += ") values(" + lineSql + ");" + insertStrArray = append(insertStrArray, lineSql) } else { fmt.Println(err.Error()) } - sqls = append(sqls, sql) } //提交 -func batchSave(sqls []string) { - fmt.Println(sqls) - //_, err := db.SQL(sqls[0:2]).Execute() - //if err != nil { - // panic(err) - //} - - //for i := range sqls { - // _, err := db.SQL(sqls[i]).Execute() - // if err != nil { - // panic(err) - // } - //} +func batchSave(tableName string, pkName string) { + //fmt.Println(pkStrArray) + //fmt.Println(insertStrArray) + var pkStr = "" + for i := range pkStrArray { + pkStr += "'" + pkStrArray[i] + "'," + } + pkStr = pkStr[0 : len(pkStr)-1] + sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr+")" + pgDb.SQL(sql).Execute() + /* + //插入 + sql = `insert into ` + tableName + "(" + var lineSql = "" + + //根据key从m中拿元素,就是按顺序拿了 + for _, k := range keys { + sql += k + "," + switch mymap[k].(type) { + case string: + lineSql += "'" + mymap[k].(string) + "'," + break + case int64: + lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + "," + break + case float64: + lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + "," + break + default: + break + } + } + lineSql = lineSql[0 : len(lineSql)-1] + sql = sql[0 : len(sql)-1] + sql += ") values(" + lineSql + ");" + } else { + fmt.Println(err.Error()) + */ }