diff --git a/dsSupport/Test/Elasticsearch/ReadElasticsearch.go b/dsSupport/Test/Elasticsearch/ReadElasticsearchDynamicTable.go similarity index 100% rename from dsSupport/Test/Elasticsearch/ReadElasticsearch.go rename to dsSupport/Test/Elasticsearch/ReadElasticsearchDynamicTable.go diff --git a/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go b/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go new file mode 100644 index 00000000..0c872fea --- /dev/null +++ b/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go @@ -0,0 +1,99 @@ +package main + +import ( + "context" + "dsSupport/Utils/GpUtil" + "encoding/csv" + "encoding/json" + "fmt" + "github.com/olivere/elastic/v7" + "github.com/tidwall/gjson" + "log" + "os" + "time" +) + +// 深度分页 +// https://www.cnblogs.com/hello-shf/p/11543453.html + +var client elastic.Client +var db = GpUtil.Engine + +func main() { + var host = "http://10.10.14.188:9200/" + esClient, err := elastic.NewClient( + elastic.SetURL(host), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(10*time.Second), + elastic.SetGzip(true), + elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), + elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), + ) + + if err != nil { + panic(err) + } + + //要同步的索引名称,也就是表名称 + indexName := "org_school" + //取所有 + CTX := context.Background() + result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) + if err != nil { + panic(err) + } + //第一次的结果集 + for i := range result.Hits.Hits { + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + resStr := string(resByte) + value := gjson.Get(resStr, "data_content") + fmt.Println(value.String()) + } + //继续用的 scoll_id + scrollId := result.ScrollId + + //第一次命中的个数 + var nowCount = int64(len(result.Hits.Hits)) + //总的命中个数 + allCount := result.TotalHits() + + //开始循环 + for { + //如果还有数据没有获取 + if allCount > nowCount { + result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) + scrollId = result.ScrollId + nowCount += int64(len(result.Hits.Hits)) + + for i := range result.Hits.Hits { + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + jsonStr := string(resByte) + m := make(map[string]interface{}) + json.Unmarshal([]byte(jsonStr), &m) + for k, v := range m { + fmt.Printf("%v: %v\n", k, v) + } + } + } else { + //没有数据了 + break + } + } +} + +func SaveToCsv(csvFileName string, _map []map[string]interface{}) { + file, err := os.OpenFile(csvFileName, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + fmt.Println("open file is failed, err: ", err) + } + defer file.Close() + // 写入UTF-8 BOM,防止中文乱码 + file.WriteString("\xEF\xBB\xBF") + w := csv.NewWriter(file) + for i := range _map { + _map[i] + } + w.Write([]string{"开发者名称", "开发者邮箱", "应用名称"}) + // 写文件需要flush,不然缓存满了,后面的就写不进去了,只会写一部分 + w.Flush() +}