package Kafka2ESTask import ( "dsDataex/MyReport/ESSql/ESSqlService" "dsDataex/MyService/DataEX" "dsDataex/Utils/CommonUtil" "dsDataex/Utils/ES7Util" "dsDataex/Utils/GeoIPUtil" "encoding/json" "fmt" "github.com/segmentio/kafka-go" "sync" "time" ) var loc sync.Mutex func Process(topic string,msg kafka.Message){ var myMsg DataEX.KafkaData var myDoc DataEX.ESData json.Unmarshal(msg.Value,&myMsg) //1、初始化 ES索引文档内容,固定部分 myDoc.DatasourceId=myMsg.DatasourceId myDoc.SystemId=myMsg.SystemId myDoc.EnableFlag=1 myDoc.DelFlag=0 myDoc.DataId=CommonUtil.GetUUID() //myDoc.BeginTime = DataEX.JsonDate(time.Now()) //myDoc.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location())) //2、Data_Content 部分 myDoc.DataContent=make(map[string]interface{}) myDoc.DataContent["collect_time"]=myMsg.CollectTime myDoc.DataContent["event_seqno"]=myMsg.EventSeqNO myDoc.DataContent["event_uri"]=myMsg.EventURI myDoc.DataContent["event_time"]=myMsg.EventTime myDoc.DataContent["event_name"]=myMsg.EventName myDoc.DataContent["event_type"]=myMsg.EventType myDoc.DataContent["access_ip"]=myMsg.AccessIP myDoc.DataContent["user_identity"]=myMsg.Identity myDoc.DataContent["access_way"]=myMsg.AccessWay myDoc.DataContent["access_id"]=myMsg.AccessID myDoc.DataContent["user_id"]=myMsg.UserID for k,v:= range myMsg.UserProperty{ _,f:=myDoc.DataContent[k] if f==false{ myDoc.DataContent[k] =v } } for k,v:= range myMsg.EventProperty{ _,f:=myDoc.DataContent[k] if f==false{ myDoc.DataContent[k] =v } } //3、补充 User_ID 对映的机构信息 flag,userDetail :=ESSqlService.GetUser4Kafka(myMsg.UserID,myMsg.Identity) if flag==true{ if len(userDetail)>0{ if len(userDetail)>1{ fmt.Println("ESSqlService.GetUser4Kafka have more than 1 results,user_id:"+myMsg.UserID) } for k,v:= range userDetail[0]{ switch k { case "province_code": myDoc.ProvinceId=v.(string) break case "province_name": myDoc.ProvinceName=v.(string) break case "city_code": myDoc.CityId=v.(string) break case "city_name": myDoc.CityName=v.(string) break case "district_code": myDoc.AreaId=v.(string) break case "district_name": myDoc.AreaName=v.(string) break case "bureau_id": myDoc.BureauId=v.(string) break case "region_id": myDoc.RegionId=v.(string) break case "main_id": myDoc.MainId=v.(string) break case "org_id": myDoc.OrgId=v.(string) break case "org_name": myDoc.OrgName=v.(string) break case "org_type": myDoc.OrgType=int(v.(float64)) break case "school_type": myDoc.SchoolType=v.(string) break case "school_typename": myDoc.SchoolTypeName=v.(string) break case "dept_id": myDoc.DeptId=v.(string) break case "stage_id": myDoc.StageId=v.(string) break case "grade_id": myDoc.GradeId=v.(string) break case "class_id": myDoc.ClassId=v.(string) break default: _,f:=myDoc.DataContent[k] if f==false{ myDoc.DataContent[k] =v } break } } } } //4、补充 Access_IP 对映的所在地信息 var geo=GeoIPUtil.GetGeo4IP(myMsg.AccessIP) if len(geo)>0{ myDoc.DataContent["access_provincecode"]=geo[0] myDoc.DataContent["access_provincename"]=geo[1] myDoc.DataContent["access_citycode"]=geo[2] myDoc.DataContent["access_cityname"]=geo[3] }else { myDoc.DataContent["access_provincecode"]="" myDoc.DataContent["access_provincename"]="" myDoc.DataContent["access_citycode"]="" myDoc.DataContent["access_cityname"]="" } //TODO: 创建 ES 索引,此处并发执行,来不及判断、创建索引 //add by zhangjun 2020-07-30 //loc.Lock() flag,_= ES7Util.IndexExist(myMsg.DatasourceId) if flag==false { ES7Util.IndexInit(myMsg.DatasourceId, myDoc.DataContent) time.Sleep(time.Millisecond * 100) } //loc.Unlock() ES7Util.IndexDocAdd(myMsg.DatasourceId, &myDoc) }