|
|
|
@ -1,110 +0,0 @@
|
|
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"dsSupport/Utils/GpUtil"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"github.com/olivere/elastic/v7"
|
|
|
|
|
"github.com/tidwall/gjson"
|
|
|
|
|
"log"
|
|
|
|
|
"os"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// 深度分页
|
|
|
|
|
// https://www.cnblogs.com/hello-shf/p/11543453.html
|
|
|
|
|
|
|
|
|
|
var client elastic.Client
|
|
|
|
|
var db = GpUtil.Engine
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
var host = "http://10.10.14.188:9200/"
|
|
|
|
|
esClient, err := elastic.NewClient(
|
|
|
|
|
elastic.SetURL(host),
|
|
|
|
|
elastic.SetSniff(false),
|
|
|
|
|
elastic.SetHealthcheckInterval(10*time.Second),
|
|
|
|
|
elastic.SetGzip(true),
|
|
|
|
|
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
|
|
|
|
|
//elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//要同步的索引名称,也就是表名称
|
|
|
|
|
indexName := "org_school"
|
|
|
|
|
//取所有
|
|
|
|
|
CTX := context.Background()
|
|
|
|
|
result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
//第一次的结果集
|
|
|
|
|
for i := range result.Hits.Hits {
|
|
|
|
|
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
|
|
|
|
|
resStr := string(resByte)
|
|
|
|
|
value := gjson.Get(resStr, "data_content")
|
|
|
|
|
//判断表是不是已存在
|
|
|
|
|
|
|
|
|
|
//如果是第一次的话而且表不存在的情况下,重新创建表结构
|
|
|
|
|
m := make(map[string]interface{})
|
|
|
|
|
json.Unmarshal([]byte(value.String()), &m)
|
|
|
|
|
if len(m) > 0 && i == 0 {
|
|
|
|
|
CreateTable(indexName, m)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//继续用的 scoll_id
|
|
|
|
|
scrollId := result.ScrollId
|
|
|
|
|
|
|
|
|
|
//第一次命中的个数
|
|
|
|
|
var nowCount = int64(len(result.Hits.Hits))
|
|
|
|
|
//总的命中个数
|
|
|
|
|
allCount := result.TotalHits()
|
|
|
|
|
|
|
|
|
|
//开始循环
|
|
|
|
|
for {
|
|
|
|
|
//如果还有数据没有获取
|
|
|
|
|
if allCount > nowCount {
|
|
|
|
|
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
|
|
|
|
|
scrollId = result.ScrollId
|
|
|
|
|
nowCount += int64(len(result.Hits.Hits))
|
|
|
|
|
|
|
|
|
|
for i := range result.Hits.Hits {
|
|
|
|
|
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
|
|
|
|
|
jsonStr := string(resByte)
|
|
|
|
|
m := make(map[string]interface{})
|
|
|
|
|
json.Unmarshal([]byte(jsonStr), &m)
|
|
|
|
|
for k, v := range m {
|
|
|
|
|
fmt.Printf("%v: %v\n", k, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
//没有数据了
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
功能:创建一张INDEX对应的表
|
|
|
|
|
*/
|
|
|
|
|
func CreateTable(tableName string, m map[string]interface{}) {
|
|
|
|
|
createTableSql := `CREATE TABLE "public"."` + tableName + `" (` + "\r\n"
|
|
|
|
|
var c = ""
|
|
|
|
|
for k, v := range m {
|
|
|
|
|
switch v.(type) {
|
|
|
|
|
case float64:
|
|
|
|
|
c = "float8"
|
|
|
|
|
case string:
|
|
|
|
|
c = "varchar(128) " + ` COLLATE "pg_catalog"."default"`
|
|
|
|
|
}
|
|
|
|
|
createTableSql += `"` + k + `" ` + c + ",\r\n"
|
|
|
|
|
}
|
|
|
|
|
createTableSql = createTableSql[0 : len(createTableSql)-3]
|
|
|
|
|
createTableSql += "\r\n" + `);` + "\r\n"
|
|
|
|
|
//创建表
|
|
|
|
|
db.SQL(createTableSql).Execute()
|
|
|
|
|
fmt.Println("成功创建表:"+tableName+"~")
|
|
|
|
|
}
|