You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

182 lines
4.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"context"
"dsSupport/Utils/CommonUtil"
"dsSupport/Utils/PgUtil"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"github.com/tidwall/gjson"
"log"
"os"
"sort"
"time"
)
// 深度分页
// TODO
// 后续需要记录每个INDEX的最后同步时间形成日志文件现在是一次性全量同步
// 因目前是以业务主键ID保持所以比ES的数据量肯定是要少的因为更新的日志记录都没有同步到数仓只是最终有效的业务ID同步过来。
// https://www.cnblogs.com/hello-shf/p/11543453.html
var client elastic.Client
var db = PgUtil.Engine
var sqls = make([]string, 0)
var keyMap = make(map[string]int, 0)
func main() {
var host = "http://10.10.14.188:9200/"
esClient, err := elastic.NewClient(
elastic.SetURL(host),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10*time.Second),
elastic.SetGzip(true),
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
)
if err != nil {
panic(err)
}
//要同步的索引名称,也就是表名称
indexName := "org_school"
sql := `SELECT
pg_constraint.conname AS pk_name,
pg_attribute.attname AS colname,
pg_type.typname AS typename
FROM
pg_constraint
INNER JOIN pg_class ON pg_constraint.conrelid = pg_class.oid
INNER JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid
AND pg_attribute.attnum = pg_constraint.conkey [ 1 ]
INNER JOIN pg_type ON pg_type.oid = pg_attribute.atttypid
WHERE
pg_class.relname = ?
AND pg_constraint.contype = 'p'`
list, _ := db.SQL(sql, indexName).Query().List()
pk := list[0]["colname"].(string)
//取所有
CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(1000).Do(CTX)
if err != nil {
panic(err)
}
//第一次的结果集
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
addRecord(indexName, pk, value.String())
}
//批量执行
batchSave(sqls)
//继续用的 scoll_id
scrollId := result.ScrollId
//第一次命中的个数
var nowCount = int64(len(result.Hits.Hits))
//总的命中个数
allCount := result.TotalHits()
//开始循环
for {
//清空
sqls = sqls[0:0]
//如果还有数据没有获取
if allCount > nowCount {
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
scrollId = result.ScrollId
nowCount += int64(len(result.Hits.Hits))
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
addRecord(indexName, pk, value.String())
}
} else {
//没有数据了
break
}
//批量执行
batchSave(sqls)
}
}
//后续优化为到达一定的阀值再提交一次初步定为200一次
func addRecord(tableName string, pk string, jsonStr string) {
var mymap map[string]interface{}
var sql = ""
if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil {
//拿到key
var keys []string
for k := range mymap {
keys = append(keys, k)
}
//对key排序
sort.Strings(keys)
//先删除再插入
sql = `delete from ` + tableName + ` where ` + pk + `=`
switch (mymap[pk]).(type) {
case string:
sqls = append(sqls, sql+"'"+mymap[pk].(string)+"';")
break
case float64:
sqls = append(sqls, sql+CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))+";")
break
default:
break
}
//插入
sql = `insert into ` + tableName + "("
var lineSql = ""
//根据key从m中拿元素就是按顺序拿了
for _, k := range keys {
sql += k + ","
switch mymap[k].(type) {
case string:
lineSql += "'" + mymap[k].(string) + "',"
break
case int64:
lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + ","
break
case float64:
lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + ","
break
default:
break
}
}
lineSql = lineSql[0 : len(lineSql)-1]
sql = sql[0 : len(sql)-1]
sql += ") values(" + lineSql + ");"
} else {
fmt.Println(err.Error())
}
sqls = append(sqls, sql)
}
//提交
func batchSave(sqls []string) {
fmt.Println(sqls)
//_, err := db.SQL(sqls[0:2]).Execute()
//if err != nil {
// panic(err)
//}
//for i := range sqls {
// _, err := db.SQL(sqls[i]).Execute()
// if err != nil {
// panic(err)
// }
//}
}