From c30298cadfe2f8703ad817c15bf4aae6e7bc9bbb Mon Sep 17 00:00:00 2001 From: zhangjun <53766543@qq.com> Date: Sun, 2 Aug 2020 13:37:55 +0800 Subject: [PATCH] kafka 2 es --- .../DataEX/DataexService/DataexService.go | 4 +++ dsDataex/MyService/MySwagger/Auth.go | 2 +- dsDataex/MyService/MySwagger/DataGet.go | 4 +-- dsDataex/MyService/MySwagger/DataPage.go | 6 ++--- dsDataex/MyService/MySwagger/DataQuery.go | 6 ++--- dsDataex/MyService/MySwagger/Event.go | 2 +- dsDataex/MyService/MySwagger/User.go | 4 +-- .../Kafka2ESService/Kafka2ESService.go | 26 ++++++++++++++++--- .../Kafka2ES/Kafka2ESTask/Kafka2ESTask.go | 12 +++++---- dsDataex/MyTask/main.go | 3 +++ dsDataex/Utils/KafkaUtil/KafkaUtil.go | 10 +++++-- dsDataex/docs/docs.go | 20 +++++++------- dsDataex/docs/swagger.json | 20 +++++++------- dsDataex/docs/swagger.yaml | 20 +++++++------- 14 files changed, 87 insertions(+), 52 deletions(-) diff --git a/dsDataex/MyService/DataEX/DataexService/DataexService.go b/dsDataex/MyService/DataEX/DataexService/DataexService.go index 7459722a..8363c323 100644 --- a/dsDataex/MyService/DataEX/DataexService/DataexService.go +++ b/dsDataex/MyService/DataEX/DataexService/DataexService.go @@ -11,6 +11,7 @@ import ( "encoding/json" "fmt" "strconv" + "strings" "time" ) @@ -516,6 +517,9 @@ func DataexPage(datasourceCode string,consumeType int,consumeOrgID string,orgID if flag{ var lstEsdata []DataEX.ESData + //add by zhangjun 2020-08-02 + beginTime =strings.Replace(beginTime,"-","/",-1) + if orgID=="-1"{ flag,result,lstEsdata= ES7Util.IndexDocPage(datasourceCode,nil,pageNO,beginTime) diff --git a/dsDataex/MyService/MySwagger/Auth.go b/dsDataex/MyService/MySwagger/Auth.go index 3232ce53..8306d536 100644 --- a/dsDataex/MyService/MySwagger/Auth.go +++ b/dsDataex/MyService/MySwagger/Auth.go @@ -4,5 +4,5 @@ package MySwagger type Auth struct { SystemID string `json:"system_id" example:"SYS-200201" ` SystemToken string `json:"system_token" example:"a6ce11eab94df48a6ce11eab94df48e38f73cf7e38f73cf7"` - AuthTime string `json:"auth_time" example:"2020-01-02 03:04:05"` + AuthTime string `json:"auth_time" example:"2020/01/02 03:04:05"` } \ No newline at end of file diff --git a/dsDataex/MyService/MySwagger/DataGet.go b/dsDataex/MyService/MySwagger/DataGet.go index 9efdc6df..5c34e1bf 100644 --- a/dsDataex/MyService/MySwagger/DataGet.go +++ b/dsDataex/MyService/MySwagger/DataGet.go @@ -3,7 +3,7 @@ package MySwagger type DataGet struct { SystemID string `json:"system_id" example:"SYS-200201"` AuthToken string `json:"auth_token" example:"DATAEX-TOKEN-a6ce-11ea-b94df48e38f73cf7"` - DataSource string `json:"data_source" example:"ORG"` - OrgID string `json:"org_id" example:"200201"` + DataSource string `json:"data_source" example:"org_school"` + OrgID string `json:"org_id" example:"-1"` DataID string `json:"data_id" example:"202008080008"` } diff --git a/dsDataex/MyService/MySwagger/DataPage.go b/dsDataex/MyService/MySwagger/DataPage.go index ba0ca586..5bb43aa0 100644 --- a/dsDataex/MyService/MySwagger/DataPage.go +++ b/dsDataex/MyService/MySwagger/DataPage.go @@ -3,8 +3,8 @@ package MySwagger type DataPage struct { SystemID string `json:"system_id" example:"SYS-200201"` AuthToken string `json:"auth_token" example:"DATAEX-TOKEN-a6ce-11ea-b94df48e38f73cf7"` - DataSource string `json:"data_source" example:"ORG"` - OrgID string `json:"org_id" example:"200201"` - QueryTime string `json:"query_time" example:"2020-01-01 02:03:04"` + DataSource string `json:"data_source" example:"org_school"` + OrgID string `json:"org_id" example:"-1"` + QueryTime string `json:"query_time" example:"2020/01/01 02:03:04"` QueryPage int `json:"query_page" example:"0"` } \ No newline at end of file diff --git a/dsDataex/MyService/MySwagger/DataQuery.go b/dsDataex/MyService/MySwagger/DataQuery.go index 217c19de..92cb10a2 100644 --- a/dsDataex/MyService/MySwagger/DataQuery.go +++ b/dsDataex/MyService/MySwagger/DataQuery.go @@ -3,8 +3,8 @@ package MySwagger type DataQuery struct { SystemID string `json:"system_id" example:"SYS-200201"` AuthToken string `json:"auth_token" example:"DATAEX-TOKEN-a6ce-11ea-b94df48e38f73cf7"` - DataSource string `json:"data_source" example:"ORG"` - OrgID string `json:"org_id" example:"200201"` + DataSource string `json:"data_source" example:"org_school"` + OrgID string `json:"org_id" example:"-1"` QueryConditions []string `json:"query_conditions" example:"subject_code=22"` - QueryPage int `json:"query_page" example:"1"` + QueryPage int `json:"query_page" example:"0"` } \ No newline at end of file diff --git a/dsDataex/MyService/MySwagger/Event.go b/dsDataex/MyService/MySwagger/Event.go index 8454db2a..733ea06f 100644 --- a/dsDataex/MyService/MySwagger/Event.go +++ b/dsDataex/MyService/MySwagger/Event.go @@ -4,7 +4,7 @@ type Event struct { EventType string `json:"event_type" example:"1001"` EventName string `json:"event_name" example:"在线提交作业"` - EventTime string `json:"event_time" example:"2020-08-08 18:19:20"` + EventTime string `json:"event_time" example:"2020/08/08 18:19:20"` EventURI string `json:"event_uri" example:"https://edusoa.com/cloud/study/do_homework"` EventSeqNO string `json:"event_seqno" example:"事件顺序号:1/2/3"` EventUserID string `json:"event_userid" example:"用户ID"` diff --git a/dsDataex/MyService/MySwagger/User.go b/dsDataex/MyService/MySwagger/User.go index 5bd57dfc..95c312d1 100644 --- a/dsDataex/MyService/MySwagger/User.go +++ b/dsDataex/MyService/MySwagger/User.go @@ -5,8 +5,8 @@ type User struct { UserID string `json:"user_id" example:"用户ID"` Identity string `json:"identity" example:"用户身份(1:教师,2:学生,3:家长,4:管理员,5:访客)"` AccessID string `json:"access_id" example:"访客ID(用户未登录)"` - AccessIP string `json:"access_ip" example:"10.10.8.88"` - AccessWay string `json:"access_way" example:"app/web/weixin/dingding"` + AccessIP string `json:"access_ip" example:"10.10.8.88"` + AccessWay string `json:"access_way" example:"app/web/weixin/dingding"` Properties string `json:"properties" example:"{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}"` } diff --git a/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go b/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go index ccd8a843..166addf8 100644 --- a/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go +++ b/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go @@ -22,6 +22,8 @@ func ServiceStart() { cronMan.Every(5).Seconds().StartImmediately().Do(DBWatch) + cronMan.Every(10).Seconds().Do(LogProcess) + cronMan.StartAsync() defer func() { @@ -36,6 +38,7 @@ func ServiceStart() { KafkaUtil.ChanTopicProc = make(map[string][]chan bool) KafkaUtil.StateTopicProc = make(map[string][]bool) + KafkaUtil.CountTopicProc = make(map[string][]int) ChanTopic = make(chan []string, 100) @@ -48,9 +51,10 @@ func ServiceStart() { LstTopic = append(LstTopic, topic) + //change by zhangjun 2020-08-02 cronMan.Every(60).Seconds().SetTag([]string{"kafka_" + topic}).StartImmediately().Do(KafkaProcess, topic, procNo) + //go KafkaProcess(topic, procNo) - //cronMan.StartAsync() } } @@ -93,6 +97,7 @@ func KafkaProcess(topic string, procNo int) { var lst []chan bool var lst2 []bool + var lst3 []int for no := 0; no < procNo; no++ { @@ -100,12 +105,14 @@ func KafkaProcess(topic string, procNo int) { lst = append(lst, chanProc) lst2 = append(lst2, true) + lst3 = append(lst3, 0) } //add by zhangjun 2020-07-30 loc.Lock() KafkaUtil.ChanTopicProc[topic] = lst KafkaUtil.StateTopicProc[topic] = lst2 + KafkaUtil.CountTopicProc[topic] = lst3 loc.Unlock() @@ -116,7 +123,7 @@ func KafkaProcess(topic string, procNo int) { //开启子线程 go KafkaUtil.Consume(topic, "group_"+topic, no) - time.Sleep(time.Second *5) + time.Sleep(time.Second * 10) } } else { //TODO:处理异常子线程!!! @@ -125,10 +132,23 @@ func KafkaProcess(topic string, procNo int) { fmt.Printf("Dataex Consume Kafka ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, "group_"+topic, no) KafkaUtil.StateTopicProc[topic][no] = true + KafkaUtil.CountTopicProc[topic][no] = 0 go KafkaUtil.Consume(topic, "group_"+topic, no) - time.Sleep(time.Second *5) + time.Sleep(time.Second * 10) + } + } + } +} + +func LogProcess() { + + for k,v:= range KafkaUtil.CountTopicProc{ + + if len(KafkaUtil.CountTopicProc[k])>0{ + for no:=0;no< len(v);no++{ + fmt.Println("Kafka Consume Topic:"+k,"【",time.Now(),"】 Process Total:",v[no]) } } } diff --git a/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go b/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go index 4306ae4b..ade6e579 100644 --- a/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go +++ b/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go @@ -160,19 +160,21 @@ func Process(topic string,msg kafka.Message){ myDoc.DataContent["access_cityname"]="" } + + //TODO: 创建 ES 索引,此处并发执行,来不及判断、创建索引 + //add by zhangjun 2020-07-30 + //loc.Lock() + flag,_= ES7Util.IndexExist(myMsg.DatasourceId) - //创建 ES 索引 if flag==false { - //add by zhangjun 2020-07-30 - loc.Lock() ES7Util.IndexInit(myMsg.DatasourceId, myDoc.DataContent) - loc.Unlock() - time.Sleep(time.Millisecond * 100) } + //loc.Unlock() + ES7Util.IndexDocAdd(myMsg.DatasourceId, &myDoc) } \ No newline at end of file diff --git a/dsDataex/MyTask/main.go b/dsDataex/MyTask/main.go index 5912c1db..10f34b5f 100644 --- a/dsDataex/MyTask/main.go +++ b/dsDataex/MyTask/main.go @@ -2,6 +2,7 @@ package main import ( "dsDataex/MyTask/Kafka2ES/Kafka2ESService" + "dsDataex/Utils/CacheUtil" "fmt" "time" ) @@ -23,6 +24,8 @@ func main() { //go proc() //time.Sleep(60 * time.Minute) + CacheUtil.OrgtreeCacheInit() + Kafka2ESService.ServiceStart() //db, _ := geoip2.Open("GeoLite2/GeoLite2-City.mmdb") diff --git a/dsDataex/Utils/KafkaUtil/KafkaUtil.go b/dsDataex/Utils/KafkaUtil/KafkaUtil.go index 177f76a4..e0b83ac3 100644 --- a/dsDataex/Utils/KafkaUtil/KafkaUtil.go +++ b/dsDataex/Utils/KafkaUtil/KafkaUtil.go @@ -26,6 +26,8 @@ var ChanTopicProc map[string][]chan bool //记录 consume 子线程状态 ( 10 分钟内是否成功执行 ReadMessage,不执行自动关闭子线程 ) var StateTopicProc map[string][]bool +var CountTopicProc map[string][]int + func init() { KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0) @@ -200,6 +202,8 @@ myLoop: r.Close() StateTopicProc[topic][index] = false + + fmt.Printf("KafkaUtil Consume Close ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index) return } //case <- ticker.C: @@ -213,15 +217,17 @@ myLoop: // } default: - msg, err := r.ReadMessage(ctx) + msg, err := r.ReadMessage( ctx ) if err != nil { - //fmt.Println("KafkaUtil ReadMessage Error :",err.Error()) + fmt.Println("KafkaUtil ReadMessage Error :",err.Error()) break myLoop } Kafka2ESTask.Process(topic, msg) + //count++ + CountTopicProc[topic][index] ++ //fmt.Printf("message at partiton %d offset %d: %s ==> %s\n",msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } } diff --git a/dsDataex/docs/docs.go b/dsDataex/docs/docs.go index cc4c95b4..aa78d932 100644 --- a/dsDataex/docs/docs.go +++ b/dsDataex/docs/docs.go @@ -1414,7 +1414,7 @@ var doc = `{ "properties": { "auth_time": { "type": "string", - "example": "2020-01-02 03:04:05" + "example": "2020/01/02 03:04:05" }, "system_id": { "type": "string", @@ -1485,11 +1485,11 @@ var doc = `{ }, "data_source": { "type": "string", - "example": "ORG" + "example": "org_school" }, "org_id": { "type": "string", - "example": "200201" + "example": "-1" }, "system_id": { "type": "string", @@ -1578,11 +1578,11 @@ var doc = `{ }, "data_source": { "type": "string", - "example": "ORG" + "example": "org_school" }, "org_id": { "type": "string", - "example": "200201" + "example": "-1" }, "query_page": { "type": "integer", @@ -1590,7 +1590,7 @@ var doc = `{ }, "query_time": { "type": "string", - "example": "2020-01-01 02:03:04" + "example": "2020/01/01 02:03:04" }, "system_id": { "type": "string", @@ -1607,11 +1607,11 @@ var doc = `{ }, "data_source": { "type": "string", - "example": "ORG" + "example": "org_school" }, "org_id": { "type": "string", - "example": "200201" + "example": "-1" }, "query_conditions": { "type": "array", @@ -1624,7 +1624,7 @@ var doc = `{ }, "query_page": { "type": "integer", - "example": 1 + "example": 0 }, "system_id": { "type": "string", @@ -1749,7 +1749,7 @@ var doc = `{ }, "event_time": { "type": "string", - "example": "2020-08-08 18:19:20" + "example": "2020/08/08 18:19:20" }, "event_type": { "type": "string", diff --git a/dsDataex/docs/swagger.json b/dsDataex/docs/swagger.json index 194cfd51..187a4e8a 100644 --- a/dsDataex/docs/swagger.json +++ b/dsDataex/docs/swagger.json @@ -1399,7 +1399,7 @@ "properties": { "auth_time": { "type": "string", - "example": "2020-01-02 03:04:05" + "example": "2020/01/02 03:04:05" }, "system_id": { "type": "string", @@ -1470,11 +1470,11 @@ }, "data_source": { "type": "string", - "example": "ORG" + "example": "org_school" }, "org_id": { "type": "string", - "example": "200201" + "example": "-1" }, "system_id": { "type": "string", @@ -1563,11 +1563,11 @@ }, "data_source": { "type": "string", - "example": "ORG" + "example": "org_school" }, "org_id": { "type": "string", - "example": "200201" + "example": "-1" }, "query_page": { "type": "integer", @@ -1575,7 +1575,7 @@ }, "query_time": { "type": "string", - "example": "2020-01-01 02:03:04" + "example": "2020/01/01 02:03:04" }, "system_id": { "type": "string", @@ -1592,11 +1592,11 @@ }, "data_source": { "type": "string", - "example": "ORG" + "example": "org_school" }, "org_id": { "type": "string", - "example": "200201" + "example": "-1" }, "query_conditions": { "type": "array", @@ -1609,7 +1609,7 @@ }, "query_page": { "type": "integer", - "example": 1 + "example": 0 }, "system_id": { "type": "string", @@ -1734,7 +1734,7 @@ }, "event_time": { "type": "string", - "example": "2020-08-08 18:19:20" + "example": "2020/08/08 18:19:20" }, "event_type": { "type": "string", diff --git a/dsDataex/docs/swagger.yaml b/dsDataex/docs/swagger.yaml index 508a11ba..7c1fe0bf 100644 --- a/dsDataex/docs/swagger.yaml +++ b/dsDataex/docs/swagger.yaml @@ -3,7 +3,7 @@ definitions: MySwagger.Auth: properties: auth_time: - example: "2020-01-02 03:04:05" + example: 2020/01/02 03:04:05 type: string system_id: example: SYS-200201 @@ -53,10 +53,10 @@ definitions: example: "202008080008" type: string data_source: - example: ORG + example: org_school type: string org_id: - example: "200201" + example: "-1" type: string system_id: example: SYS-200201 @@ -118,16 +118,16 @@ definitions: example: DATAEX-TOKEN-a6ce-11ea-b94df48e38f73cf7 type: string data_source: - example: ORG + example: org_school type: string org_id: - example: "200201" + example: "-1" type: string query_page: example: 0 type: integer query_time: - example: "2020-01-01 02:03:04" + example: 2020/01/01 02:03:04 type: string system_id: example: SYS-200201 @@ -139,10 +139,10 @@ definitions: example: DATAEX-TOKEN-a6ce-11ea-b94df48e38f73cf7 type: string data_source: - example: ORG + example: org_school type: string org_id: - example: "200201" + example: "-1" type: string query_conditions: example: @@ -151,7 +151,7 @@ definitions: type: string type: array query_page: - example: 1 + example: 0 type: integer system_id: example: SYS-200201 @@ -236,7 +236,7 @@ definitions: example: 事件顺序号:1/2/3 type: string event_time: - example: "2020-08-08 18:19:20" + example: 2020/08/08 18:19:20 type: string event_type: example: "1001"