You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

790 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package ES7Util
import (
"context"
"dsSupport/Utils/CommonUtil"
"dsSupport/Utils/ConfigUtil"
"dsSupport/Utils/ES7Util/DataEX"
"dsSupport/Utils/ES7Util/MySwagger"
"dsSupport/Utils/RedisUtil"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"reflect"
"strconv"
"strings"
"time"
)
var ES7Client *elastic.Client
var ServerVersion string
var CTX context.Context
/**
* @Author zhangjun
* @Description ES7 客户端工具初始化
* @Date 2020-06-12 16:34
* @Param
* @return
**/
func init() {
//初始化ES Client使用配置文件
ES7Client, _ = elastic.NewClient(elastic.SetURL(ConfigUtil.ESAddress...))
CTX=context.Background()
//Ping获取ES server启动信息
info, _, _ := ES7Client.Ping(ConfigUtil.ESAddress[0]).Do(CTX)
ServerVersion =ES7Client.String() +"【"+ info.Version.Number+"】"
}
/**
* @Author zhangjun
* @Description 创建ES索引
* @Date 2020-06-12 14:31
* @Param indexName string ES索引名称【相当于数据库表名】
* @Param dataMap string ES索引Mapping【相当于数据库表定义】
* @return bool 成功/失败
* @return error 异常
**/
func IndexCreate(indexName string,dataMap string) (bool,error){
const emptyMap = `
{
"settings":{
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings":{
}
}`
if dataMap==""{
dataMap=emptyMap
}
createIndex, err := ES7Client.CreateIndex(indexName).Body(dataMap).Pretty(true).Do(CTX)
if createIndex.Acknowledged {
return true,nil
} else{
return false,err
}
}
/**
* @Author zhangjun
* @Description 判断ES索引是否存在,并同步到 Redis 缓存
* @Date 2020-06-12 14:42
* @Param indexName string ES索引名称【相当于数据库表名】
* @return bool 成功/失败
* @return error 异常
**/
func IndexExist(indexName string) (bool,error){
count,_:= RedisUtil.RedisClient.Exists("es_index_"+indexName).Result()
if count>0 {
return true,nil
}else{
exists, _ := ES7Client.IndexExists(indexName).Do(CTX)
if exists {
RedisUtil.SET("es_index_"+indexName,indexName,10*time.Hour)
return true,nil
}else {
return false,nil
}
}
}
/**
* @Author zhangjun
* @Description 使用一条索引数据创建索引,并删除这条数据
* @Date 2020-06-12 14:38
* @Param indexName string ES索引名称【相当于数据库表名】
* @Param data map[string]interface{} 一条索引数据
* @return bool 成功/失败
* @return error 异常
**/
func IndexInit(indexName string,dataex map[string]interface{}) (bool,error) {
IndexCreate(indexName,"")
begin:= time.Now();
end:=time.Date(9999,9,9,9,9,9,0,begin.Location())
//dataex := make(map[string]interface{})
//dataex["org_id"]="123"
//dataex["org_name"]="长春市第一中学"
//dataex["org_type"]=2
var myMap=make(map[string]interface{})
for key, val := range dataex {
switch val.(type){
case int,int32,int64:
new,_:=strconv.ParseFloat(val.(string),64)
myMap[key]=new +0.1
case float64:
new,_:=val.(float64)
myMap[key]=new +0.1
default:
myMap[key]=val
}
}
dataJson := DataEX.ESData{SystemId: "BASE001",
DatasourceId: indexName,
ProvinceId: "220000",
CityId: "220100",
AreaId: "220101",
RegionId: "R22010101",
BureauId: "B22010101",
MainId: "M2201010101",
OrgId: "O22010101010101",
OrgType: 1,
GradeId: "G2020",
ClassId: "C202001",
DeptId: "D20200001",
StageId: "S0001",
DataId: "D22010101010101",
DataContent: myMap,
FileUri: "/path/file",
BeginTime: DataEX.JsonDate(begin) ,
EndTime: DataEX.JsonDate(end),
DelFlag: -1,
EnableFlag: 1,
}
result, err := ES7Client.Index().Index(indexName).Id("2020-2020").BodyJson(dataJson).Do(CTX)
if result.Result=="created" {
//change by zhangjun 2020-06-30
//ES7Client.Delete().Index(indexName).Id("2020-2020").Do(CTX)
res,_:= ES7Client.GetMapping().Index(indexName).Do(CTX)
bytes,_:=json.Marshal( res[indexName] )
mapping :=string(bytes)
new :=strings.Replace(mapping,"\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}},","",-1)
new=strings.Replace(new,"{\"mappings\"","{\"settings\":{\"number_of_shards\": 3,\"number_of_replicas\": 1},\"mappings\"",-1)
new=strings.Replace(new,"\"text\"","\"keyword\"",-1)
//Delete the dafault date format
//new=strings.Replace(new ,"\"format\":\"yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis\",","",-1)
//new=strings.Replace(new,"\"date\"}","\"date\",\"format\":\"yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis\"}",-1)
ES7Client.DeleteIndex(indexName).Do(CTX)
ES7Client.CreateIndex(indexName).Body(new).Pretty(true).Do(CTX)
return true,nil
}else{
return false,err
}
}
/**
* @Author zhangjun
* @Description 获取ES索引 Mapping 结构
* @Date 2020-06-12 17:07
* @Param indexName string ES索引名称【相当于数据库表名】
* @return map[string]interface mapping data_content数据
**/
func IndexDatamap(indexName string) map[string]interface{} {
result,_:= ES7Client.GetMapping().Index(indexName).Do(CTX)
return result["data_content"].(map[string]interface{})
}
/**
* @Author zhangjun
* @Description 添加文档
* @Date 2020-06-15 16:52
* @Param indexName string
* @Param data ESData 文档数据
* @return
**/
func IndexDocAdd(indexName string,indexData *DataEX.ESData) (bool,string,error){
indexData.BeginTime = DataEX.JsonDate(time.Now());
indexData.EndTime =DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location()))
newid:=CommonUtil.GetUUID()
result, err := ES7Client.Index().Index(indexName).Id(newid).BodyJson(indexData).Do(CTX)
if result.Result=="created" {
return true,"文档操作成功",nil
}else{
return false,"文档操作失败",err
}
}
/**
* @Author zhangjun
* @Description Kafka 2 ES
* @Date 2020-08-02 02:44
* @Param
* @return
**/
func IndexDocAdd2(indexName string,indexData *DataEX.ESData){
//defer func() {
// if err := recover(); err != nil {
// fmt.Println("IndexDocAdd2 Panic Recover :", err)
// }
//}()
indexData.BeginTime = DataEX.JsonDate(time.Now());
indexData.EndTime =DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location()))
res, err := ES7Client.Index().Index(indexName).Id(indexData.DataId).BodyJson(indexData).Do(CTX)
if err != nil {
fmt.Println("IndexDocAdd2 Error :" +err.Error())
}else {
if res.Result == "created" {
//fmt.Println("IndexDocAdd2 Result :" ,res)
}
if res.Result == "updated"{
//fmt.Println("IndexDocAdd2 Result :" ,res)
}
}
}
/**
* @Author zhangjun
* @Description 批量添加文档
* @Date 2020-06-17 09:31
* @Param indexName string
* @Param indexData 【】ESData 文档数据数组
* @return bool 成功/失败
* @return error 错误
* @return 【】string 成功数据ID数组
* @return 【】map【string】string 失败数据map数组
**/
func IndexDocAddBatch(indexName string,indexData []*DataEX.ESData) (bool,error,[]string,[]MySwagger.FailResult){
var requests []elastic.BulkableRequest
var tempMap =make(map[string]string)
var IDs []string
var Fails []MySwagger.FailResult
for no:=0;no< len(indexData);no++{
//indexData[no].BeginTime = time.Now();
//indexData[no].EndTime =time.Date(9999,9,9,9,9,9,0,time.Now().Location())
newid:=CommonUtil.GetUUID()
//文档ID 绑定 数据ID
tempMap[newid]=indexData[no].DataId
IDs=append(IDs,indexData[no].DataId)
req := elastic.NewBulkIndexRequest()
req.Index(indexName).Id(newid).Doc(indexData[no])
requests=append(requests,req)
}
//批量添加索引文档,只返回错误记录
result, _ := ES7Client.Bulk().Index(indexName).Add(requests...).FilterPath("items.*.error").Do(CTX)
//result, err := ES7Client.Bulk().Index(indexName).Add(requests...).Do(CTX)
ES7Client.Refresh(indexName).Do(CTX)
if len(result.Items) ==0 {
return true,nil,IDs,nil
}else {
for no:=0;no< len(result.Items);no++{
reason :=result.Items[no]["index"].Error.Reason
docID := reason[strings.Index(reason,"document with id '")+ 18:strings.Index(reason,"document with id '") + 18 + 36]
dataID:= tempMap[docID]
IDs=remove(IDs,dataID)
var fail MySwagger.FailResult
fail.FailID=dataID
fail.FailReason=reason
Fails =append(Fails,fail)
}
return true,nil,IDs,Fails
}
}
/**
* @Author zhangjun
* @Description 逻辑删除相同dataID的文档
* @Date 2020-06-16 15:59
* @Param indexName string 索引名称
* @Param dataID string 数据ID
* @return
**/
func IndexDocDel(indexName string,dataID string) (bool,string,error){
//change by zhangjun 2020-07-01
//term := elastic.NewTermQuery("data_id.keyword", dataID)
term := elastic.NewTermQuery("data_id", dataID)
term2 := elastic.NewTermQuery("enable_flag", 1)
query:=elastic.NewBoolQuery()
query.Filter(term).Filter(term2)
script := elastic.NewScript("ctx._source.enable_flag = 0 ; ctx._source.end_time = params.now ").Param("now", time.Now())
response,err:= ES7Client.UpdateByQuery().Index(indexName).Query(query).Script(script).Do(CTX)
if err !=nil{
return false,"文档修改失败",err
}
if response.Updated >0{
return true,"文档修改成功",nil
}
return true,"文档修改成功,没有符合筛选条件的文档",nil
}
/**
* @Author zhangjun
* @Description 批量逻辑删除
* @Date 2020-06-17 09:33
* @Param indexName string 索引名称
* @Param dataIDs 【】string 数据ID数组
* @return
**/
func IndexDocDelBatch(indexName string, end time.Time, dataIDs []string) (bool,string,error){
//change by zhangjun 2020-07-01
//term := elastic.NewTermsQuery("data_id.keyword", string2interface(dataIDs)...)
term := elastic.NewTermsQuery("data_id", string2interface(dataIDs)...)
term2 := elastic.NewTermQuery("enable_flag", 1)
query:=elastic.NewBoolQuery()
query.Filter(term).Filter(term2)
script := elastic.NewScript("ctx._source.enable_flag = 0 ; ctx._source.end_time = params.now ").Param("now", DataEX.JsonDate(end))
response,err:= ES7Client.UpdateByQuery().Index(indexName).Query(query).Script(script).Do(CTX)
ES7Client.Refresh(indexName).Do(CTX)
if err !=nil{
return false,"文档批量修改失败",err
}
if response.Updated >0{
return true,"文档批量修改成功",nil
}
return true,"文档批量修改成功,没有符合筛选条件的文档",nil
}
func IndexDocDelBatch2(indexName string,end time.Time, dataIDs []string) (bool,string,error){
//change by zhangjun 2020-07-01
//term := elastic.NewTermsQuery("data_id.keyword", string2interface(dataIDs)...)
term := elastic.NewTermsQuery("data_id", string2interface(dataIDs)...)
term2 := elastic.NewTermQuery("enable_flag", 1)
term3 := elastic.NewRangeQuery("begin_time").Lt(end)
query:=elastic.NewBoolQuery()
query.Filter(term).Filter(term2).Filter(term3)
script := elastic.NewScript("ctx._source.enable_flag = 0 ; ctx._source.end_time = params.now ").Param("now", DataEX.JsonDate(end))
response,err:= ES7Client.UpdateByQuery().Index(indexName).Query(query).Script(script).Do(CTX)
ES7Client.Refresh(indexName).Do(CTX)
if err !=nil{
return false,"文档批量修改失败",err
}
if response.Updated >0{
return true,"文档批量修改成功",nil
}
return true,"文档批量修改成功,没有符合筛选条件的文档",nil
}
func IndexDocDelRestore(indexName string, end time.Time, dataIDs []string) (bool,string,error){
//change by zhangjun 2020-07-01
//term := elastic.NewTermsQuery("data_id.keyword", string2interface(dataIDs)...)
term := elastic.NewTermsQuery("data_id", string2interface(dataIDs)...)
term2 := elastic.NewTermQuery("enable_flag", 0)
term3 := elastic.NewTermQuery("end_time",end)
query:=elastic.NewBoolQuery()
query.Filter(term).Filter(term2).Filter(term3)
newEnd :=time.Date(9999,9,9,9,9,9,0,time.Now().Location())
script := elastic.NewScript("ctx._source.enable_flag = 1 ; ctx._source.end_time = params.now ").Param("now", DataEX.JsonDate(newEnd))
response,err:= ES7Client.UpdateByQuery().Index(indexName).Query(query).Script(script).Do(CTX)
ES7Client.Refresh(indexName).Do(CTX)
if err !=nil{
return false,"文档批量修改失败",err
}
if response.Updated >0{
return true,"文档批量修改成功",nil
}
return true,"文档批量修改成功,没有符合筛选条件的文档",nil
}
func IndexDocGet(indexName string,dataID string) (bool,string,DataEX.ESData) {
var data DataEX.ESData
//change by zhangjun 2020-07-01
//term := elastic.NewTermQuery("data_id.keyword", dataID)
term := elastic.NewTermQuery("data_id", dataID)
term2 := elastic.NewTermQuery("enable_flag", 1)
query:=elastic.NewBoolQuery()
query.Filter(term).Filter(term2)
result,err:=ES7Client.Search().Index(indexName).Query(query).Do(CTX)
if err == nil{
var ttyp DataEX.ESData
for _, item := range result.Each(reflect.TypeOf(ttyp)) {
data = item.(DataEX.ESData)
break
}
return true,"获取数据成功", data
}else{
return false,"获取数据失败", data
}
}
func IndexDocPage(indexName string,orgIDs []string,page int,begin string) (bool,string,[]DataEX.ESData) {
var list []DataEX.ESData
term2 := elastic.NewTermQuery("enable_flag", 1)
query:=elastic.NewBoolQuery()
query.Filter(term2)
if len(orgIDs)!=0{
//change by zhangjun 2020-07-01
//term := elastic.NewTermsQuery("org_id.keyword", string2interface(orgIDs)...)
term := elastic.NewTermsQuery("org_id", string2interface(orgIDs)...)
query.Filter(term)
}
if begin!=""{
term3 := elastic.NewRangeQuery("begin_time").Gte(begin)
query.Filter(term3)
}
result,err:=ES7Client.Search().Index(indexName).Query(query).From(page * 100).Size(100).Sort("begin_time",true).Do(CTX)
if err == nil{
var ttyp DataEX.ESData
for _, item := range result.Each(reflect.TypeOf(ttyp)) {
data := item.(DataEX.ESData)
list=append(list,data)
}
return true,"获取数据成功", list
}else{
return false,"获取数据失败", list
}
}
func IndexDocQuery(indexName string,orgIDs []string,page int,conditions []string) (bool,string,[]DataEX.ESData) {
var list []DataEX.ESData
term2 := elastic.NewTermQuery("enable_flag", 1)
query:=elastic.NewBoolQuery()
query.Filter(term2)
if len(orgIDs)!=0{
//change by zhangjun 2020-07-01
//term := elastic.NewTermsQuery("org_id.keyword", string2interface(orgIDs)...)
term := elastic.NewTermsQuery("org_id", string2interface(orgIDs)...)
query.Filter(term)
}
if len(conditions)>0{
for no:=0;no< len(conditions);no++{
if strings.Contains(conditions[no],">="){
arr:=strings.Split(conditions[no],">=")
term3 := elastic.NewRangeQuery(strings.TrimSpace(arr[0])).Gte(strings.TrimSpace(arr[1]))
query.Filter(term3)
continue
}
if strings.Contains(conditions[no],"<="){
arr:=strings.Split(conditions[no],"<=")
term3 := elastic.NewRangeQuery(strings.TrimSpace(arr[0])).Lte(strings.TrimSpace(arr[1]))
query.Filter(term3)
continue
}
if strings.Contains(conditions[no],"="){
arr:=strings.Split(conditions[no],"=")
term3 := elastic.NewTermQuery(strings.TrimSpace(arr[0]) ,strings.TrimSpace(arr[1]))
query.Filter(term3)
continue
}
if strings.Contains(conditions[no],">"){
arr:=strings.Split(conditions[no],">")
term3 := elastic.NewRangeQuery(strings.TrimSpace(arr[0])).Gt(strings.TrimSpace(arr[1]))
query.Filter(term3)
continue
}
if strings.Contains(conditions[no],"<"){
arr:=strings.Split(conditions[no],"<")
term3 := elastic.NewRangeQuery(strings.TrimSpace(arr[0])).Lt(strings.TrimSpace(arr[1]))
query.Filter(term3)
continue
}
}
}
result,err:=ES7Client.Search().Index(indexName).Query(query).From(page * 100).Size(100).Sort("begin_time",true).Do(CTX)
if err == nil{
var ttyp DataEX.ESData
for _, item := range result.Each(reflect.TypeOf(ttyp)) {
data := item.(DataEX.ESData)
list=append(list,data)
}
return true,"获取数据成功", list
}else{
return false,"获取数据失败", list
}
}
func remove(array []string,del string)[]string {
var result []string
for no:=0;no< len(array);no++{
if array[no]!=del{
result=append(result,array[no])
}
}
return result
}
func IndexRefresh(indexName string){
ES7Client.Refresh(indexName).Do(CTX)
}
/**
* @Author zhangjun
* @Description 字符数组转换为接口数组否则terms query会报错很坑
* @Date 2020-06-17 16:10
* @Param
* @return
**/
func string2interface(arr []string) []interface{} {
var result []interface{}
for no:=0;no< len(arr);no++{
result=append(result,arr[no])
}
return result
}
/**
* @title SearchDocPage
* @Description 检索文档
* @Author wangshuai
* @Date 2020-09-16
* @Param indexName string 数据源CODE
* @Param orgIDs []string 机构ID集合
* @Param begin string 开始时间
* @Param conditions map[string]interface{} 查询条件
* @Param sort map[string]interface{} 排序
* @Return bool 执行结果
* @Return string 提示信息
* @Return []DataEX.ESData 文档集合
*/
func SearchDocPage(indexName string,orgIDs []string,page int,begin string, conditions map[string]interface{}, sort map[string]interface{}) (bool,string,[]DataEX.ESData) {
var list []DataEX.ESData
var field string
var ascending bool
term2 := elastic.NewTermQuery("enable_flag", 1)
query:=elastic.NewBoolQuery()
query.Filter(term2)
if len(orgIDs)!=0{
term := elastic.NewTermsQuery("org_id", string2interface(orgIDs)...)
query.Filter(term)
}
if page > 0 {
page = page - 1
}
if begin!=""{
term3 := elastic.NewRangeQuery("begin_time").Gte(begin)
query.Filter(term3)
}
if conditions != nil {
for k, v := range conditions {
condition := elastic.NewTermsQuery(k, v)
query.Filter(condition)
}
}
if sort == nil {
field = "begin_time"
ascending = true
} else {
field = sort["field"].(string)
ascending = sort["ascending"].(bool)
}
result,err:=ES7Client.Search().Index(indexName).Query(query).From(page * 100).Size(100).Sort(field, ascending).Do(CTX)
fmt.Println(result)
if err == nil{
var ttyp DataEX.ESData
for _, item := range result.Each(reflect.TypeOf(ttyp)) {
data := item.(DataEX.ESData)
list=append(list,data)
}
return true,"获取数据成功", list
}else{
return false,"获取数据失败", list
}
}
/**
* @title IndexDataContentMapping
* @Description 获取ES索引 Mapping 结构
* @Author wangshuai
* @Date 2020-09-16
* @Param indexName string 数据源CODE
* @Return map[string]interface{} data_content mapping 数据
*/
func IndexDataContentMapping(indexName string) map[string]interface{} {
result,_:= ES7Client.GetMapping().Index(indexName).Do(CTX)
//fmt.Println(result["user_student"].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})["data_content"].(map[string]interface{})["properties"])
return result[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})["data_content"].(map[string]interface{})["properties"].(map[string]interface{})
}
/**
* @title GetDocCount
* @Description 获取文档数量
* @Author wangshuai
* @Date 2020-09-16
* @Param indexName string 数据源CODE
* @Param begin string 开始时间
* @Param conditions map[string]interface{} 查询条件
* @Return int 数量
* @Return error 错误信息
*/
func GetDocCount(indexName string, begin string, conditions map[string]interface{}) (int, error) {
term2 := elastic.NewTermQuery("enable_flag", 1)
query:=elastic.NewBoolQuery()
query.Filter(term2)
if begin!=""{
term3 := elastic.NewRangeQuery("begin_time").Gte(begin)
query.Filter(term3)
}
if conditions != nil {
for k, v := range conditions {
condition := elastic.NewTermsQuery(k, v)
query.Filter(condition)
}
}
count64, err := ES7Client.Count(indexName).Query(query).Do(CTX)
count := CommonUtil.ConvertInt64ToInt(count64)
if err != nil {
return count, nil
}else {
return count, err
}
}
/**
* @title GetLatestDoc
* @Description 获取最新一条文档
* @Author wangshuai
* @Date 2020-09-16
* @Param indexName string 数据源CODE
* @Return bool 执行结果
* @Return string 提示信息
* @Return DataEX.ESData 文档数据
*/
func GetLatestDoc(indexName string) (bool,string,DataEX.ESData) {
var data DataEX.ESData
var list []DataEX.ESData
term2 := elastic.NewTermQuery("enable_flag", 1)
query:=elastic.NewBoolQuery()
query.Filter(term2)
result,err:=ES7Client.Search().Index(indexName).Query(query).From(1).Size(1).Sort("begin_time", false).Do(CTX)
if err == nil{
var ttyp DataEX.ESData
for _, item := range result.Each(reflect.TypeOf(ttyp)) {
data := item.(DataEX.ESData)
list = append(list,data)
}
return true,"获取数据成功", list[0]
}else{
return false,"获取数据失败", data
}
}
// 暂时作废 2020-09-16
func GetDoc(indexName string, dataId string) (bool,string,DataEX.ESData) {
var data DataEX.ESData
var list []DataEX.ESData
term2 := elastic.NewTermQuery("enable_flag", 1)
//query.Filter(term2)
term3 := elastic.NewTermQuery("data_id", dataId)
query:=elastic.NewBoolQuery()
query.Filter(term2).Filter(term3)
count, err := ES7Client.Count(indexName).Query(query).Do(CTX)
fmt.Println("count= ", count)
result,err:=ES7Client.Search().Index(indexName).Query(query).From(0).Sort("begin_time", false).Size(1).Do(CTX)
fmt.Println(result)
if err == nil{
var ttyp DataEX.ESData
//fmt.Println("reflect.TypeOf(ttyp)=", reflect.TypeOf(ttyp))
for _, item := range result.Each(reflect.TypeOf(ttyp)) {
//fmt.Println("item=", item)
data := item.(DataEX.ESData)
//fmt.Println("data=", data)
list = append(list,data)
}
fmt.Println("list=", list)
return true,"获取数据成功", data
}else{
return false,"获取数据失败", data
}
}