You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
4.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package Kafka2ESTask
import (
"dsDataex/MyReport/ESSql/ESSqlService"
"dsDataex/MyService/DataEX"
"dsDataex/Utils/ES7Util"
"dsDataex/Utils/GeoIPUtil"
"encoding/json"
"fmt"
"github.com/segmentio/kafka-go"
"sync"
"time"
)
var loc sync.Mutex
func Process(topic string,msg kafka.Message){
//now:=time.Now()
var myMsg DataEX.KafkaData
var myDoc DataEX.ESData
json.Unmarshal(msg.Value,&myMsg)
//1、初始化 ES索引文档内容固定部分
myDoc.DatasourceId=myMsg.DatasourceId
myDoc.SystemId=myMsg.SystemId
myDoc.EnableFlag=1
myDoc.DelFlag=0
//change by zhangjun 2020-08-02
//myDoc.DataId=CommonUtil.GetUUID()
myDoc.DataId=myMsg.DataId
//myDoc.BeginTime = DataEX.JsonDate(time.Now())
//myDoc.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location()))
//2、Data_Content 部分
myDoc.DataContent=make(map[string]interface{})
myDoc.DataContent["collect_time"]=myMsg.CollectTime
myDoc.DataContent["event_seqno"]=myMsg.EventSeqNO
myDoc.DataContent["event_uri"]=myMsg.EventURI
myDoc.DataContent["event_time"]=myMsg.EventTime
myDoc.DataContent["event_name"]=myMsg.EventName
myDoc.DataContent["event_type"]=myMsg.EventType
myDoc.DataContent["access_ip"]=myMsg.AccessIP
myDoc.DataContent["user_identity"]=myMsg.Identity
myDoc.DataContent["access_way"]=myMsg.AccessWay
myDoc.DataContent["access_id"]=myMsg.AccessID
myDoc.DataContent["user_id"]=myMsg.UserID
for k,v:= range myMsg.UserProperty{
_,f:=myDoc.DataContent[k]
if f==false{
myDoc.DataContent[k] =v
}
}
for k,v:= range myMsg.EventProperty{
_,f:=myDoc.DataContent[k]
if f==false{
myDoc.DataContent[k] =v
}
}
//fmt.Println("Step 1+2 :",time.Now().Sub(now).Milliseconds())
//3、补充 User_ID 对映的机构信息
flag,userDetail :=ESSqlService.GetUser4Kafka(myMsg.UserID,myMsg.Identity)
if flag==true{
if len(userDetail)>0{
if len(userDetail)>1{
fmt.Println("ESSqlService.GetUser4Kafka have more than 1 results,user_id:"+myMsg.UserID)
}
for k,v:= range userDetail[0]{
switch k {
case "province_code":
myDoc.ProvinceId=v.(string)
break
case "province_name":
myDoc.ProvinceName=v.(string)
break
case "city_code":
myDoc.CityId=v.(string)
break
case "city_name":
myDoc.CityName=v.(string)
break
case "district_code":
myDoc.AreaId=v.(string)
break
case "district_name":
myDoc.AreaName=v.(string)
break
case "bureau_id":
myDoc.BureauId=v.(string)
break
case "region_id":
myDoc.RegionId=v.(string)
break
case "main_id":
myDoc.MainId=v.(string)
break
case "org_id":
myDoc.OrgId=v.(string)
break
case "org_name":
myDoc.OrgName=v.(string)
break
case "org_type":
myDoc.OrgType=int(v.(float64))
break
case "school_type":
myDoc.SchoolType=v.(string)
break
case "school_typename":
myDoc.SchoolTypeName=v.(string)
break
case "dept_id":
myDoc.DeptId=v.(string)
break
case "stage_id":
myDoc.StageId=v.(string)
break
case "grade_id":
myDoc.GradeId=v.(string)
break
case "class_id":
myDoc.ClassId=v.(string)
break
default:
_,f:=myDoc.DataContent[k]
if f==false{
myDoc.DataContent[k] =v
}
break
}
}
}
}
//fmt.Println("Step 3 :",time.Now().Sub(now).Milliseconds())
//4、补充 Access_IP 对映的所在地信息
var geo=GeoIPUtil.GetGeo4IP(myMsg.AccessIP)
if len(geo)>0{
myDoc.DataContent["access_provincecode"]=geo[0]
myDoc.DataContent["access_provincename"]=geo[1]
myDoc.DataContent["access_citycode"]=geo[2]
myDoc.DataContent["access_cityname"]=geo[3]
}else {
myDoc.DataContent["access_provincecode"]=""
myDoc.DataContent["access_provincename"]=""
myDoc.DataContent["access_citycode"]=""
myDoc.DataContent["access_cityname"]=""
}
//fmt.Println("Step 4 :",time.Now().Sub(now).Milliseconds())
//TODO: 创建 ES 索引,此处并发执行,来不及判断、创建索引
//add by zhangjun 2020-07-30
//loc.Lock()
flag,_= ES7Util.IndexExist(myMsg.DatasourceId)
if flag==false {
ES7Util.IndexInit(myMsg.DatasourceId, myDoc.DataContent)
time.Sleep(time.Millisecond * 100)
}
//loc.Unlock()
//change by zhangjun 2020-08-02
//ES7Util.IndexDocDel(myMsg.DatasourceId,myMsg.DataId)
//ES7Util.IndexDocAdd(myMsg.DatasourceId, &myDoc)
go ES7Util.IndexDocAdd2(myMsg.DatasourceId, &myDoc)
//fmt.Println("Step 5 :",time.Now().Sub(now).Milliseconds())
}