You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

152 lines
3.6 KiB

package main
import (
"context"
"dsSupport/Utils/CommonUtil"
"dsSupport/Utils/PgUtil"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"github.com/tidwall/gjson"
"log"
"os"
"time"
)
// 深度分页
// https://www.cnblogs.com/hello-shf/p/11543453.html
var client elastic.Client
var db = PgUtil.Engine
func main() {
var host = "http://10.10.14.188:9200/"
esClient, err := elastic.NewClient(
elastic.SetURL(host),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10*time.Second),
elastic.SetGzip(true),
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
)
if err != nil {
panic(err)
}
//要同步的索引名称,也就是表名称
indexName := "org_school"
sql := `SELECT
pg_constraint.conname AS pk_name,
pg_attribute.attname AS colname,
pg_type.typname AS typename
FROM
pg_constraint
INNER JOIN pg_class ON pg_constraint.conrelid = pg_class.oid
INNER JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid
AND pg_attribute.attnum = pg_constraint.conkey [ 1 ]
INNER JOIN pg_type ON pg_type.oid = pg_attribute.atttypid
WHERE
pg_class.relname = ?
AND pg_constraint.contype = 'p'`
list, _ := db.SQL(sql, indexName).Query().List()
pk := list[0]["colname"].(string)
//取所有
CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX)
if err != nil {
panic(err)
}
//第一次的结果集
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
addRecord(indexName, pk, value.String())
}
//继续用的 scoll_id
scrollId := result.ScrollId
//第一次命中的个数
var nowCount = int64(len(result.Hits.Hits))
//总的命中个数
allCount := result.TotalHits()
//开始循环
for {
//如果还有数据没有获取
if allCount > nowCount {
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
scrollId = result.ScrollId
nowCount += int64(len(result.Hits.Hits))
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
addRecord(indexName, pk, value.String())
}
} else {
//没有数据了
break
}
}
}
func addRecord(tableName string, pk string, jsonStr string) {
var mymap map[string]interface{}
var sql = ""
if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil {
//先删除再插入
sql = `delete from ` + tableName + ` where ` + pk + `=?`
switch (mymap[pk]).(type) {
case string:
_, err := db.SQL(sql, mymap[pk].(string)).Execute()
if err != nil {
panic(err.Error())
}
break
case float64:
_, err := db.SQL(sql, int64(mymap[pk].(float64))).Execute()
if err != nil {
panic(err.Error())
}
break
default:
break
}
//插入
sql = `insert into ` + tableName + "("
var lineSql = ""
for key, value := range mymap {
sql += key + ","
switch value.(type) {
case string:
lineSql += "'" + value.(string) + "',"
break
case int64:
lineSql += CommonUtil.ConvertInt64ToString(value.(int64)) + ","
break
case float64:
lineSql += fmt.Sprintf("%d", int64(value.(float64))) + ","
break
default:
break
}
}
lineSql = lineSql[0 : len(lineSql)-1]
sql = sql[0 : len(sql)-1]
sql += ") values(" + lineSql + ");"
} else {
fmt.Println(err.Error())
}
fmt.Println(sql)
_, err := db.SQL(sql).Execute()
if err != nil {
panic(err.Error())
} else {
fmt.Println("成功插入一条数据!")
}
}