master
zhangjun 5 years ago
parent c0fedf2364
commit f40287c38d

@ -27,7 +27,7 @@ brokers = 10.10.14.238:9092,
;brokers = 192.168.0.200:9092,192.168.0.200:9091
partition = 8
replication = 1
process_no = 2
process_no = 1
[elasticsearch]

@ -7,12 +7,15 @@ import (
"fmt"
"github.com/go-co-op/gocron"
"reflect"
"sync"
"time"
)
var ChanTopic chan []string
var LstTopic []string
var loc sync.Mutex
func ServiceStart() {
cronMan := gocron.NewScheduler(time.UTC)
@ -21,6 +24,14 @@ func ServiceStart() {
cronMan.StartAsync()
defer func() {
if err := recover(); err != nil {
fmt.Println("Kafka2ESService Panic Recover :", err)
cronMan.Stop()
}
}()
var procNo = int(ConfigUtil.KafkaProcNo)
KafkaUtil.ChanTopicProc = make(map[string][]chan bool)
@ -90,16 +101,22 @@ func KafkaProcess(topic string, procNo int) {
lst = append(lst, chanProc)
lst2 = append(lst2, true)
}
//add by zhangjun 2020-07-30
loc.Lock()
KafkaUtil.ChanTopicProc[topic] = lst
KafkaUtil.StateTopicProc[topic] = lst2
loc.Unlock()
for no := 0; no < procNo; no++ {
fmt.Printf("Dataex Consume Kafka ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, "group_"+topic, no)
//开启子线程
go KafkaUtil.Consume(topic, "group_"+topic, no)
time.Sleep(time.Second *5)
}
} else { //TODO处理异常子线程
@ -110,6 +127,8 @@ func KafkaProcess(topic string, procNo int) {
KafkaUtil.StateTopicProc[topic][no] = true
go KafkaUtil.Consume(topic, "group_"+topic, no)
time.Sleep(time.Second *5)
}
}
}

@ -4,12 +4,17 @@ import (
"dsDataex/MyReport/ESSql/ESSqlService"
"dsDataex/MyService/DataEX"
"dsDataex/Utils/CommonUtil"
"dsDataex/Utils/ES7Util"
"dsDataex/Utils/GeoIPUtil"
"encoding/json"
"fmt"
"github.com/segmentio/kafka-go"
"sync"
"time"
)
var loc sync.Mutex
func Process(topic string,msg kafka.Message){
var myMsg DataEX.KafkaData
@ -25,8 +30,8 @@ func Process(topic string,msg kafka.Message){
myDoc.DataId=CommonUtil.GetUUID()
myDoc.BeginTime = DataEX.JsonDate(time.Now())
myDoc.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location()))
//myDoc.BeginTime = DataEX.JsonDate(time.Now())
//myDoc.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location()))
//2、Data_Content 部分
myDoc.DataContent=make(map[string]interface{})
@ -65,21 +70,109 @@ func Process(topic string,msg kafka.Message){
flag,userDetail :=ESSqlService.GetUser4Kafka(myMsg.UserID,myMsg.Identity)
if flag==true{
if len(userDetail)==1{
if len(userDetail)>0{
if len(userDetail)>1{
fmt.Println("ESSqlService.GetUser4Kafka have more than 1 results,user_id:"+myMsg.UserID)
}
for k,v:= range userDetail[0]{
_,f:=myDoc.DataContent[k]
if f==false{
myDoc.DataContent[k] =v
switch k {
case "province_code":
myDoc.ProvinceId=v.(string)
break
case "province_name":
myDoc.ProvinceName=v.(string)
break
case "city_code":
myDoc.CityId=v.(string)
break
case "city_name":
myDoc.CityName=v.(string)
break
case "district_code":
myDoc.AreaId=v.(string)
break
case "district_name":
myDoc.AreaName=v.(string)
break
case "bureau_id":
myDoc.BureauId=v.(string)
break
case "region_id":
myDoc.RegionId=v.(string)
break
case "main_id":
myDoc.MainId=v.(string)
break
case "org_id":
myDoc.OrgId=v.(string)
break
case "org_name":
myDoc.OrgName=v.(string)
break
case "org_type":
myDoc.OrgType=int(v.(float64))
break
case "school_type":
myDoc.SchoolType=v.(string)
break
case "school_typename":
myDoc.SchoolTypeName=v.(string)
break
case "dept_id":
myDoc.DeptId=v.(string)
break
case "stage_id":
myDoc.StageId=v.(string)
break
case "grade_id":
myDoc.GradeId=v.(string)
break
case "class_id":
myDoc.ClassId=v.(string)
break
default:
_,f:=myDoc.DataContent[k]
if f==false{
myDoc.DataContent[k] =v
}
break
}
}
}else {
fmt.Println("ESSqlService.GetUser4Kafka have more than 1 results,user_id:"+myMsg.UserID)
}
}
//4、补充 Access_IP 对映的所在地信息
var geo=GeoIPUtil.GetGeo4IP(myMsg.AccessIP)
if len(geo)>0{
myDoc.DataContent["access_provincecode"]=geo[0]
myDoc.DataContent["access_provincename"]=geo[1]
myDoc.DataContent["access_citycode"]=geo[2]
myDoc.DataContent["access_cityname"]=geo[3]
}else {
myDoc.DataContent["access_provincecode"]=""
myDoc.DataContent["access_provincename"]=""
myDoc.DataContent["access_citycode"]=""
myDoc.DataContent["access_cityname"]=""
}
flag,_= ES7Util.IndexExist(myMsg.DatasourceId)
//创建 ES 索引
if flag==false {
//add by zhangjun 2020-07-30
loc.Lock()
ES7Util.IndexInit(myMsg.DatasourceId, myDoc.DataContent)
loc.Unlock()
time.Sleep(time.Millisecond * 100)
}
ES7Util.IndexDocAdd(myMsg.DatasourceId, &myDoc)
}

@ -1,7 +1,7 @@
package main
import (
"dsDataex/Utils/GeoIPUtil"
"dsDataex/MyTask/Kafka2ES/Kafka2ESService"
"fmt"
"time"
)
@ -23,7 +23,7 @@ func main() {
//go proc()
//time.Sleep(60 * time.Minute)
//Kafka2ESService.ServiceStart()
Kafka2ESService.ServiceStart()
//db, _ := geoip2.Open("GeoLite2/GeoLite2-City.mmdb")
//defer db.Close()
@ -34,12 +34,12 @@ func main() {
////fmt.Println("GeoLite2 :",record)
//fmt.Println(record.Country.Names["ch-ZN"],record.Subdivisions[0].Names["zh-CN"], record.City.Names["zh-CN"])
fmt.Println(GeoIPUtil.GetGeo4IP("124.235.206.60"))
fmt.Println(GeoIPUtil.GetGeo4IP("202.106.212.226"))
fmt.Println(GeoIPUtil.GetGeo4IP("220.181.38.148"))
fmt.Println(GeoIPUtil.GetGeo4IP("123.103.122.24"))
fmt.Println(GeoIPUtil.GetGeo4IP("222.161.206.62"))
fmt.Println(GeoIPUtil.GetGeo4IP("121.28.35.6"))
//fmt.Println(GeoIPUtil.GetGeo4IP("124.235.206.60"))
//fmt.Println(GeoIPUtil.GetGeo4IP("202.106.212.226"))
//fmt.Println(GeoIPUtil.GetGeo4IP("220.181.38.148"))
//fmt.Println(GeoIPUtil.GetGeo4IP("123.103.122.24"))
//fmt.Println(GeoIPUtil.GetGeo4IP("222.161.206.62"))
//fmt.Println(GeoIPUtil.GetGeo4IP("121.28.35.6"))
//q := qqwry.NewQQwry("GeoLite2/qqwry.dat")
//q.Find("222.161.206.62")

@ -12,7 +12,7 @@ var City2260 map[string][]string
func init() {
//打开 IP离线数据
//打开 纯真 IP离线库
DB = qqwry.NewQQwry("GeoLite2/qqwry.dat")
//行政区划缓存

@ -169,6 +169,7 @@ func Consume(topic string, group string, index int) {
defer func() {
if err := recover(); err != nil {
fmt.Println("KafkaUtil Consume Panic Recover :", err)
StateTopicProc[topic][index] = false
}
}()
@ -228,7 +229,7 @@ myLoop:
StateTopicProc[topic][index] = false
r.Close()
//fmt.Printf("KafkaUtil Consume Stop ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index)
fmt.Printf("KafkaUtil Consume Stop ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index)
}
/**

@ -711,6 +711,7 @@ golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=

Loading…
Cancel
Save