You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dsMin/dsSupport/Test/Elasticsearch/ReadElasticsearchDynamicTab...

111 lines
2.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"context"
"dsSupport/Utils/GpUtil"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"github.com/tidwall/gjson"
"log"
"os"
"time"
)
// 深度分页
// https://www.cnblogs.com/hello-shf/p/11543453.html
var client elastic.Client
var db = GpUtil.Engine
func main() {
var host = "http://10.10.14.188:9200/"
esClient, err := elastic.NewClient(
elastic.SetURL(host),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10*time.Second),
elastic.SetGzip(true),
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
//elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
)
if err != nil {
panic(err)
}
//要同步的索引名称,也就是表名称
indexName := "org_school"
//取所有
CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX)
if err != nil {
panic(err)
}
//第一次的结果集
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte)
value := gjson.Get(resStr, "data_content")
//判断表是不是已存在
//如果是第一次的话而且表不存在的情况下,重新创建表结构
m := make(map[string]interface{})
json.Unmarshal([]byte(value.String()), &m)
if len(m) > 0 && i == 0 {
CreateTable(indexName, m)
}
}
//继续用的 scoll_id
scrollId := result.ScrollId
//第一次命中的个数
var nowCount = int64(len(result.Hits.Hits))
//总的命中个数
allCount := result.TotalHits()
//开始循环
for {
//如果还有数据没有获取
if allCount > nowCount {
result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX)
scrollId = result.ScrollId
nowCount += int64(len(result.Hits.Hits))
for i := range result.Hits.Hits {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
jsonStr := string(resByte)
m := make(map[string]interface{})
json.Unmarshal([]byte(jsonStr), &m)
for k, v := range m {
fmt.Printf("%v: %v\n", k, v)
}
}
} else {
//没有数据了
break
}
}
}
/**
功能创建一张INDEX对应的表
*/
func CreateTable(tableName string, m map[string]interface{}) {
createTableSql := `CREATE TABLE "public"."` + tableName + `" (` + "\r\n"
var c = ""
for k, v := range m {
switch v.(type) {
case float64:
c = "float8"
case string:
c = "varchar(128) " + ` COLLATE "pg_catalog"."default"`
}
createTableSql += `"` + k + `" ` + c + ",\r\n"
}
createTableSql = createTableSql[0 : len(createTableSql)-3]
createTableSql += "\r\n" + `);` + "\r\n"
//创建表
db.SQL(createTableSql).Execute()
fmt.Println("成功创建表:"+tableName+"~")
}