You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
119 lines
2.6 KiB
119 lines
2.6 KiB
package main
|
|
|
|
import (
|
|
"context"
|
|
"dsSupport/Utils/CommonUtil"
|
|
"dsSupport/Utils/PgUtil"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/olivere/elastic/v7"
|
|
"github.com/tidwall/gjson"
|
|
"log"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
// 深度分页
|
|
// https://www.cnblogs.com/hello-shf/p/11543453.html
|
|
|
|
var client elastic.Client
|
|
var db = PgUtil.Engine
|
|
|
|
func main() {
|
|
var host = "http://10.10.14.188:9200/"
|
|
esClient, err := elastic.NewClient(
|
|
elastic.SetURL(host),
|
|
elastic.SetSniff(false),
|
|
elastic.SetHealthcheckInterval(10*time.Second),
|
|
elastic.SetGzip(true),
|
|
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
|
|
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
|
|
)
|
|
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
//要同步的索引名称,也就是表名称
|
|
indexName := "org_school"
|
|
//取所有
|
|
CTX := context.Background()
|
|
result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
//第一次的结果集
|
|
for i := range result.Hits.Hits {
|
|
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
|
|
resStr := string(resByte)
|
|
value := gjson.Get(resStr, "data_content")
|
|
addRecord(indexName, value.String())
|
|
}
|
|
//继续用的 scoll_id
|
|
scrollId := result.ScrollId
|
|
|
|
//第一次命中的个数
|
|
var nowCount = int64(len(result.Hits.Hits))
|
|
//总的命中个数
|
|
allCount := result.TotalHits()
|
|
|
|
//开始循环
|
|
for {
|
|
//如果还有数据没有获取
|
|
if allCount > nowCount {
|
|
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
|
|
scrollId = result.ScrollId
|
|
nowCount += int64(len(result.Hits.Hits))
|
|
|
|
for i := range result.Hits.Hits {
|
|
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
|
|
resStr := string(resByte)
|
|
value := gjson.Get(resStr, "data_content")
|
|
addRecord(indexName, value.String())
|
|
}
|
|
} else {
|
|
//没有数据了
|
|
break
|
|
}
|
|
}
|
|
}
|
|
func addRecord(tableName string, jsonStr string) {
|
|
sql := `insert into ` + tableName + "("
|
|
var mymap map[string]interface{}
|
|
if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil {
|
|
for key, _ := range mymap {
|
|
sql += key + ","
|
|
}
|
|
sql = sql[0 : len(sql)-1]
|
|
sql += ") values("
|
|
|
|
for _, value := range mymap {
|
|
switch value.(type) {
|
|
case string:
|
|
sql += "'" + value.(string) + "',"
|
|
break
|
|
case int64:
|
|
sql += CommonUtil.ConvertInt64ToString(value.(int64)) + ","
|
|
break
|
|
case float64:
|
|
sql += fmt.Sprintf("%d", int64(value.(float64)))+ ","
|
|
break
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
sql = sql[0 : len(sql)-1]
|
|
} else {
|
|
fmt.Println(err.Error())
|
|
}
|
|
|
|
sql += ");"
|
|
fmt.Println(sql)
|
|
_,err:=db.SQL(sql).Execute()
|
|
if err!=nil{
|
|
fmt.Println("成功插入一条数据!")
|
|
}else{
|
|
panic(err.Error())
|
|
}
|
|
}
|