diff --git a/dsDataex/Config/Config.ini b/dsDataex/Config/Config.ini index 32a1408b..64178af2 100644 --- a/dsDataex/Config/Config.ini +++ b/dsDataex/Config/Config.ini @@ -27,7 +27,7 @@ brokers = 10.10.14.238:9092, ;brokers = 192.168.0.200:9092,192.168.0.200:9091 partition = 8 replication = 1 -process_no = 2 +process_no = 1 [elasticsearch] diff --git a/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go b/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go index d515df01..ccd8a843 100644 --- a/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go +++ b/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go @@ -7,12 +7,15 @@ import ( "fmt" "github.com/go-co-op/gocron" "reflect" + "sync" "time" ) var ChanTopic chan []string var LstTopic []string +var loc sync.Mutex + func ServiceStart() { cronMan := gocron.NewScheduler(time.UTC) @@ -21,6 +24,14 @@ func ServiceStart() { cronMan.StartAsync() + defer func() { + if err := recover(); err != nil { + fmt.Println("Kafka2ESService Panic Recover :", err) + + cronMan.Stop() + } + }() + var procNo = int(ConfigUtil.KafkaProcNo) KafkaUtil.ChanTopicProc = make(map[string][]chan bool) @@ -90,16 +101,22 @@ func KafkaProcess(topic string, procNo int) { lst = append(lst, chanProc) lst2 = append(lst2, true) } + //add by zhangjun 2020-07-30 + loc.Lock() KafkaUtil.ChanTopicProc[topic] = lst KafkaUtil.StateTopicProc[topic] = lst2 + loc.Unlock() + for no := 0; no < procNo; no++ { fmt.Printf("Dataex Consume Kafka ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, "group_"+topic, no) //开启子线程 go KafkaUtil.Consume(topic, "group_"+topic, no) + + time.Sleep(time.Second *5) } } else { //TODO:处理异常子线程!!! @@ -110,6 +127,8 @@ func KafkaProcess(topic string, procNo int) { KafkaUtil.StateTopicProc[topic][no] = true go KafkaUtil.Consume(topic, "group_"+topic, no) + + time.Sleep(time.Second *5) } } } diff --git a/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go b/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go index 47b6cb29..4306ae4b 100644 --- a/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go +++ b/dsDataex/MyTask/Kafka2ES/Kafka2ESTask/Kafka2ESTask.go @@ -4,12 +4,17 @@ import ( "dsDataex/MyReport/ESSql/ESSqlService" "dsDataex/MyService/DataEX" "dsDataex/Utils/CommonUtil" + "dsDataex/Utils/ES7Util" + "dsDataex/Utils/GeoIPUtil" "encoding/json" "fmt" "github.com/segmentio/kafka-go" + "sync" "time" ) +var loc sync.Mutex + func Process(topic string,msg kafka.Message){ var myMsg DataEX.KafkaData @@ -25,8 +30,8 @@ func Process(topic string,msg kafka.Message){ myDoc.DataId=CommonUtil.GetUUID() - myDoc.BeginTime = DataEX.JsonDate(time.Now()) - myDoc.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location())) + //myDoc.BeginTime = DataEX.JsonDate(time.Now()) + //myDoc.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location())) //2、Data_Content 部分 myDoc.DataContent=make(map[string]interface{}) @@ -65,21 +70,109 @@ func Process(topic string,msg kafka.Message){ flag,userDetail :=ESSqlService.GetUser4Kafka(myMsg.UserID,myMsg.Identity) if flag==true{ - if len(userDetail)==1{ + if len(userDetail)>0{ + if len(userDetail)>1{ + fmt.Println("ESSqlService.GetUser4Kafka have more than 1 results,user_id:"+myMsg.UserID) + } for k,v:= range userDetail[0]{ - _,f:=myDoc.DataContent[k] - - if f==false{ - myDoc.DataContent[k] =v + switch k { + case "province_code": + myDoc.ProvinceId=v.(string) + break + case "province_name": + myDoc.ProvinceName=v.(string) + break + case "city_code": + myDoc.CityId=v.(string) + break + case "city_name": + myDoc.CityName=v.(string) + break + case "district_code": + myDoc.AreaId=v.(string) + break + case "district_name": + myDoc.AreaName=v.(string) + break + case "bureau_id": + myDoc.BureauId=v.(string) + break + case "region_id": + myDoc.RegionId=v.(string) + break + case "main_id": + myDoc.MainId=v.(string) + break + case "org_id": + myDoc.OrgId=v.(string) + break + case "org_name": + myDoc.OrgName=v.(string) + break + case "org_type": + myDoc.OrgType=int(v.(float64)) + break + case "school_type": + myDoc.SchoolType=v.(string) + break + case "school_typename": + myDoc.SchoolTypeName=v.(string) + break + case "dept_id": + myDoc.DeptId=v.(string) + break + case "stage_id": + myDoc.StageId=v.(string) + break + case "grade_id": + myDoc.GradeId=v.(string) + break + case "class_id": + myDoc.ClassId=v.(string) + break + default: + + _,f:=myDoc.DataContent[k] + + if f==false{ + myDoc.DataContent[k] =v + } + break } } - }else { - fmt.Println("ESSqlService.GetUser4Kafka have more than 1 results,user_id:"+myMsg.UserID) } } //4、补充 Access_IP 对映的所在地信息 + var geo=GeoIPUtil.GetGeo4IP(myMsg.AccessIP) + + if len(geo)>0{ + myDoc.DataContent["access_provincecode"]=geo[0] + myDoc.DataContent["access_provincename"]=geo[1] + myDoc.DataContent["access_citycode"]=geo[2] + myDoc.DataContent["access_cityname"]=geo[3] + }else { + myDoc.DataContent["access_provincecode"]="" + myDoc.DataContent["access_provincename"]="" + myDoc.DataContent["access_citycode"]="" + myDoc.DataContent["access_cityname"]="" + } + + flag,_= ES7Util.IndexExist(myMsg.DatasourceId) + + //创建 ES 索引 + if flag==false { + //add by zhangjun 2020-07-30 + loc.Lock() + + ES7Util.IndexInit(myMsg.DatasourceId, myDoc.DataContent) + + loc.Unlock() + + time.Sleep(time.Millisecond * 100) + } + ES7Util.IndexDocAdd(myMsg.DatasourceId, &myDoc) } \ No newline at end of file diff --git a/dsDataex/MyTask/main.go b/dsDataex/MyTask/main.go index 74b46bff..5912c1db 100644 --- a/dsDataex/MyTask/main.go +++ b/dsDataex/MyTask/main.go @@ -1,7 +1,7 @@ package main import ( - "dsDataex/Utils/GeoIPUtil" + "dsDataex/MyTask/Kafka2ES/Kafka2ESService" "fmt" "time" ) @@ -23,7 +23,7 @@ func main() { //go proc() //time.Sleep(60 * time.Minute) - //Kafka2ESService.ServiceStart() + Kafka2ESService.ServiceStart() //db, _ := geoip2.Open("GeoLite2/GeoLite2-City.mmdb") //defer db.Close() @@ -34,12 +34,12 @@ func main() { ////fmt.Println("GeoLite2 :",record) //fmt.Println(record.Country.Names["ch-ZN"],record.Subdivisions[0].Names["zh-CN"], record.City.Names["zh-CN"]) - fmt.Println(GeoIPUtil.GetGeo4IP("124.235.206.60")) - fmt.Println(GeoIPUtil.GetGeo4IP("202.106.212.226")) - fmt.Println(GeoIPUtil.GetGeo4IP("220.181.38.148")) - fmt.Println(GeoIPUtil.GetGeo4IP("123.103.122.24")) - fmt.Println(GeoIPUtil.GetGeo4IP("222.161.206.62")) - fmt.Println(GeoIPUtil.GetGeo4IP("121.28.35.6")) + //fmt.Println(GeoIPUtil.GetGeo4IP("124.235.206.60")) + //fmt.Println(GeoIPUtil.GetGeo4IP("202.106.212.226")) + //fmt.Println(GeoIPUtil.GetGeo4IP("220.181.38.148")) + //fmt.Println(GeoIPUtil.GetGeo4IP("123.103.122.24")) + //fmt.Println(GeoIPUtil.GetGeo4IP("222.161.206.62")) + //fmt.Println(GeoIPUtil.GetGeo4IP("121.28.35.6")) //q := qqwry.NewQQwry("GeoLite2/qqwry.dat") //q.Find("222.161.206.62") diff --git a/dsDataex/Utils/GeoIPUtil/GeoIPUtil.go b/dsDataex/Utils/GeoIPUtil/GeoIPUtil.go index 68ef0a06..d6ac37ca 100644 --- a/dsDataex/Utils/GeoIPUtil/GeoIPUtil.go +++ b/dsDataex/Utils/GeoIPUtil/GeoIPUtil.go @@ -12,7 +12,7 @@ var City2260 map[string][]string func init() { - //打开 IP离线数据库 + //打开 纯真 IP离线库 DB = qqwry.NewQQwry("GeoLite2/qqwry.dat") //行政区划缓存 diff --git a/dsDataex/Utils/KafkaUtil/KafkaUtil.go b/dsDataex/Utils/KafkaUtil/KafkaUtil.go index 3c82267d..177f76a4 100644 --- a/dsDataex/Utils/KafkaUtil/KafkaUtil.go +++ b/dsDataex/Utils/KafkaUtil/KafkaUtil.go @@ -169,6 +169,7 @@ func Consume(topic string, group string, index int) { defer func() { if err := recover(); err != nil { fmt.Println("KafkaUtil Consume Panic Recover :", err) + StateTopicProc[topic][index] = false } }() @@ -228,7 +229,7 @@ myLoop: StateTopicProc[topic][index] = false r.Close() - //fmt.Printf("KafkaUtil Consume Stop ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index) + fmt.Printf("KafkaUtil Consume Stop ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index) } /** diff --git a/dsDataex/go.sum b/dsDataex/go.sum index 7619d715..48223d06 100644 --- a/dsDataex/go.sum +++ b/dsDataex/go.sum @@ -711,6 +711,7 @@ golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=