package Kafka2ESTask import ( "dsDataex/MyReport/ESSql/ESSqlService" "dsDataex/MyService/DataEX" "dsDataex/Utils/CommonUtil" "encoding/json" "fmt" "github.com/segmentio/kafka-go" "time" ) func Process(topic string,msg kafka.Message){ var myMsg DataEX.KafkaData var myDoc DataEX.ESData json.Unmarshal(msg.Value,&myMsg) //1、初始化 ES索引文档内容,固定部分 myDoc.DatasourceId=myMsg.DatasourceId myDoc.SystemId=myMsg.SystemId myDoc.EnableFlag=1 myDoc.DelFlag=0 myDoc.DataId=CommonUtil.GetUUID() myDoc.BeginTime = DataEX.JsonDate(time.Now()) myDoc.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location())) //2、Data_Content 部分 myDoc.DataContent=make(map[string]interface{}) myDoc.DataContent["collect_time"]=myMsg.CollectTime myDoc.DataContent["event_seqno"]=myMsg.EventSeqNO myDoc.DataContent["event_uri"]=myMsg.EventURI myDoc.DataContent["event_time"]=myMsg.EventTime myDoc.DataContent["event_name"]=myMsg.EventName myDoc.DataContent["event_type"]=myMsg.EventType myDoc.DataContent["access_ip"]=myMsg.AccessIP myDoc.DataContent["user_identity"]=myMsg.Identity myDoc.DataContent["access_way"]=myMsg.AccessWay myDoc.DataContent["access_id"]=myMsg.AccessID myDoc.DataContent["user_id"]=myMsg.UserID for k,v:= range myMsg.UserProperty{ _,f:=myDoc.DataContent[k] if f==false{ myDoc.DataContent[k] =v } } for k,v:= range myMsg.EventProperty{ _,f:=myDoc.DataContent[k] if f==false{ myDoc.DataContent[k] =v } } //3、补充 User_ID 对映的机构信息 flag,userDetail :=ESSqlService.GetUser4Kafka(myMsg.UserID,myMsg.Identity) if flag==true{ if len(userDetail)==1{ for k,v:= range userDetail[0]{ _,f:=myDoc.DataContent[k] if f==false{ myDoc.DataContent[k] =v } } }else { fmt.Println("ESSqlService.GetUser4Kafka have more than 1 results,user_id:"+myMsg.UserID) } } //4、补充 Access_IP 对映的所在地信息 }