From 467f34088a0e12ee36be009177d49bdb420fc4c1 Mon Sep 17 00:00:00 2001 From: zhangjun <53766543@qq.com> Date: Sun, 2 Aug 2020 17:22:05 +0800 Subject: [PATCH] kafka 2 es --- dsDataex/Config/Config.ini | 2 +- .../DataEX/DataexService/DataexService.go | 4 ++++ dsDataex/MyService/DataEX/KafkaData.go | 1 + .../Kafka2ESService/Kafka2ESService.go | 6 +++--- .../Kafka2ES/Kafka2ESTask/Kafka2ESTask.go | 19 ++++++++++++++---- dsDataex/Utils/ES7Util/ES7Util.go | 20 +++++++++++++++++++ dsDataex/Utils/KafkaUtil/KafkaUtil.go | 15 ++++++++++++-- 7 files changed, 57 insertions(+), 10 deletions(-) diff --git a/dsDataex/Config/Config.ini b/dsDataex/Config/Config.ini index 64178af2..87e4206a 100644 --- a/dsDataex/Config/Config.ini +++ b/dsDataex/Config/Config.ini @@ -25,7 +25,7 @@ expireTime = 86400 [kafka] brokers = 10.10.14.238:9092, ;brokers = 192.168.0.200:9092,192.168.0.200:9091 -partition = 8 +partition = 20 replication = 1 process_no = 1 diff --git a/dsDataex/MyService/DataEX/DataexService/DataexService.go b/dsDataex/MyService/DataEX/DataexService/DataexService.go index 8363c323..3b6a67b0 100644 --- a/dsDataex/MyService/DataEX/DataexService/DataexService.go +++ b/dsDataex/MyService/DataEX/DataexService/DataexService.go @@ -6,6 +6,7 @@ import ( "dsDataex/MyService/DataEX/DataexDAO" "dsDataex/MyService/MySwagger" "dsDataex/Utils/CacheUtil" + "dsDataex/Utils/CommonUtil" "dsDataex/Utils/ES7Util" "dsDataex/Utils/KafkaUtil" "encoding/json" @@ -288,6 +289,9 @@ func DataexCollect(systemID string,users []MySwagger.User,events []MySwagger.Eve data.DatasourceId=datasource.DatasourceCode data.CollectTime=now + //add by zhangjun 2020-08-02 + data.DataId=CommonUtil.GetUUID() + if data.AccessIP==""{ data.AccessIP=ip } diff --git a/dsDataex/MyService/DataEX/KafkaData.go b/dsDataex/MyService/DataEX/KafkaData.go index a2978a4f..5b12ec07 100644 --- a/dsDataex/MyService/DataEX/KafkaData.go +++ b/dsDataex/MyService/DataEX/KafkaData.go @@ -4,6 +4,7 @@ type KafkaData struct { SystemId string `json:"system_id"` DatasourceId string `json:"datasource_id"` + DataId string `json:"data_id"` UserID string `json:"user_id"` Identity string `json:"identity"` diff --git a/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go b/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go index 166addf8..7a50514b 100644 --- a/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go +++ b/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go @@ -118,7 +118,7 @@ func KafkaProcess(topic string, procNo int) { for no := 0; no < procNo; no++ { - fmt.Printf("Dataex Consume Kafka ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, "group_"+topic, no) + fmt.Printf("Dataex Kafka2ES Process Start,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, "group_"+topic, no) //开启子线程 go KafkaUtil.Consume(topic, "group_"+topic, no) @@ -129,7 +129,7 @@ func KafkaProcess(topic string, procNo int) { for no := 0; no < len(KafkaUtil.StateTopicProc[topic]); no++ { if KafkaUtil.StateTopicProc[topic][no] == false { - fmt.Printf("Dataex Consume Kafka ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, "group_"+topic, no) + fmt.Printf("Dataex Kafka2ES Process Start,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, "group_"+topic, no) KafkaUtil.StateTopicProc[topic][no] = true KafkaUtil.CountTopicProc[topic][no] = 0 @@ -148,7 +148,7 @@ func LogProcess() { if len(KafkaUtil.CountTopicProc[k])>0{ for no:=0;no< len(v);no++{ - fmt.Println("Kafka Consume Topic:"+k,"【",time.Now(),"】 Process Total:",v[no]) + fmt.Println("[Kafka] ["+k+"] "+time.Now().Format("2006/01/02 15:04:05")+" Process message total:",v[no]) } } } diff --git a/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go b/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go index ade6e579..0d72ea66 100644 --- a/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go +++ b/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go @@ -3,7 +3,6 @@ package Kafka2ESTask import ( "dsDataex/MyReport/ESSql/ESSqlService" "dsDataex/MyService/DataEX" - "dsDataex/Utils/CommonUtil" "dsDataex/Utils/ES7Util" "dsDataex/Utils/GeoIPUtil" "encoding/json" @@ -17,6 +16,7 @@ var loc sync.Mutex func Process(topic string,msg kafka.Message){ + //now:=time.Now() var myMsg DataEX.KafkaData var myDoc DataEX.ESData @@ -28,7 +28,9 @@ func Process(topic string,msg kafka.Message){ myDoc.EnableFlag=1 myDoc.DelFlag=0 - myDoc.DataId=CommonUtil.GetUUID() + //change by zhangjun 2020-08-02 + //myDoc.DataId=CommonUtil.GetUUID() + myDoc.DataId=myMsg.DataId //myDoc.BeginTime = DataEX.JsonDate(time.Now()) //myDoc.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location())) @@ -66,6 +68,8 @@ func Process(topic string,msg kafka.Message){ } } + //fmt.Println("Step 1+2 :",time.Now().Sub(now).Milliseconds()) + //3、补充 User_ID 对映的机构信息 flag,userDetail :=ESSqlService.GetUser4Kafka(myMsg.UserID,myMsg.Identity) @@ -145,6 +149,7 @@ func Process(topic string,msg kafka.Message){ } } + //fmt.Println("Step 3 :",time.Now().Sub(now).Milliseconds()) //4、补充 Access_IP 对映的所在地信息 var geo=GeoIPUtil.GetGeo4IP(myMsg.AccessIP) @@ -160,7 +165,7 @@ func Process(topic string,msg kafka.Message){ myDoc.DataContent["access_cityname"]="" } - + //fmt.Println("Step 4 :",time.Now().Sub(now).Milliseconds()) //TODO: 创建 ES 索引,此处并发执行,来不及判断、创建索引 //add by zhangjun 2020-07-30 //loc.Lock() @@ -176,5 +181,11 @@ func Process(topic string,msg kafka.Message){ //loc.Unlock() - ES7Util.IndexDocAdd(myMsg.DatasourceId, &myDoc) + //change by zhangjun 2020-08-02 + //ES7Util.IndexDocDel(myMsg.DatasourceId,myMsg.DataId) + //ES7Util.IndexDocAdd(myMsg.DatasourceId, &myDoc) + + go ES7Util.IndexDocAdd2(myMsg.DatasourceId, &myDoc) + + //fmt.Println("Step 5 :",time.Now().Sub(now).Milliseconds()) } \ No newline at end of file diff --git a/dsDataex/Utils/ES7Util/ES7Util.go b/dsDataex/Utils/ES7Util/ES7Util.go index fecb0953..7c2793fe 100644 --- a/dsDataex/Utils/ES7Util/ES7Util.go +++ b/dsDataex/Utils/ES7Util/ES7Util.go @@ -228,6 +228,26 @@ func IndexDocAdd(indexName string,indexData *DataEX.ESData) (bool,string,error){ } } +/** + * @Author zhangjun + * @Description Kafka 2 ES + * @Date 2020-08-02 02:44 + * @Param + * @return + **/ +func IndexDocAdd2(indexName string,indexData *DataEX.ESData) (bool,string,error){ + indexData.BeginTime = DataEX.JsonDate(time.Now()); + indexData.EndTime =DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location())) + + result, err := ES7Client.Index().Index(indexName).Id(indexData.DataId).BodyJson(indexData).Do(CTX) + + if result.Result=="created" { + return true,"文档操作成功",nil + }else{ + return false,"文档操作失败",err + } +} + /** * @Author zhangjun * @Description 批量添加文档 diff --git a/dsDataex/Utils/KafkaUtil/KafkaUtil.go b/dsDataex/Utils/KafkaUtil/KafkaUtil.go index e0b83ac3..9dabc497 100644 --- a/dsDataex/Utils/KafkaUtil/KafkaUtil.go +++ b/dsDataex/Utils/KafkaUtil/KafkaUtil.go @@ -91,8 +91,18 @@ func ProvideLow(topic string, datas []DataEX.KafkaData) (bool, string) { if f == true { client = c + + //add by zhangjun 2020-08-02 判断链接超时,异常关闭 + _, err:= client.ReadLastOffset() + if err!=nil{ + client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num) + + kafkaPool[topic][num] = client + } + } else { client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num) + kafkaPool[topic][num] = client } } @@ -203,7 +213,7 @@ myLoop: StateTopicProc[topic][index] = false - fmt.Printf("KafkaUtil Consume Close ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index) + fmt.Printf("Dataex Kafka2ES Process Close ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index) return } //case <- ticker.C: @@ -224,6 +234,7 @@ myLoop: break myLoop } + //TODO:此处不能使用 go,否则速度太快,数据会丢失,以后可以考虑优化ES集群 Kafka2ESTask.Process(topic, msg) //count++ @@ -235,7 +246,7 @@ myLoop: StateTopicProc[topic][index] = false r.Close() - fmt.Printf("KafkaUtil Consume Stop ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index) + fmt.Printf("Dataex Kafka2ES Process Stop ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index) } /**