diff --git a/dsDataex/Config/Config.ini b/dsDataex/Config/Config.ini index 7cca1287..919d6888 100644 --- a/dsDataex/Config/Config.ini +++ b/dsDataex/Config/Config.ini @@ -22,13 +22,12 @@ expireTime = 86400 ;db = 0 ;expireTime = 86400 - - [kafka] brokers = 10.10.14.238:9092, ;brokers = 192.168.0.200:9092,192.168.0.200:9091 +partition = 8 replication = 1 - +process_no = 2 [elasticsearch] diff --git a/dsDataex/MyService/DataEX/DataexService/DataexService.go b/dsDataex/MyService/DataEX/DataexService/DataexService.go index 7116b9d3..7459722a 100644 --- a/dsDataex/MyService/DataEX/DataexService/DataexService.go +++ b/dsDataex/MyService/DataEX/DataexService/DataexService.go @@ -9,6 +9,7 @@ import ( "dsDataex/Utils/ES7Util" "dsDataex/Utils/KafkaUtil" "encoding/json" + "fmt" "strconv" "time" ) @@ -191,6 +192,12 @@ func DataexSet(systemID string, datas []MySwagger.Data,datasource *models.TDatae func DataexCollect(systemID string,users []MySwagger.User,events []MySwagger.Event,datasource *models.TDataexDatasource,ip string) (bool,string) { + defer func(){ + if err:=recover();err!=nil{ + fmt.Println("DataexCollect Panic Recover :",err) + } + }() + var now = time.Now().Format("2006/01/02 15:04:05") var userInfo = make( map[string]MySwagger.User) diff --git a/dsDataex/MyTask/Kafka2ES/Kafka2ESDAO/Kafka2ESDAO.go b/dsDataex/MyTask/Kafka2ES/Kafka2ESDAO/Kafka2ESDAO.go new file mode 100644 index 00000000..3cebeff0 --- /dev/null +++ b/dsDataex/MyTask/Kafka2ES/Kafka2ESDAO/Kafka2ESDAO.go @@ -0,0 +1,28 @@ +package Kafka2ESDAO + +import ( + "dsDataex/Utils/DbUtil" +) + +//数据库 +var db = DbUtil.Engine + +func GetTopics() (bool, []string){ + + sql := "SELECT datasource_code from t_dataex_datasource where delete_flag = -1 and enable_flag = 1 and datastore_type = 3" + + result,err:=db.QueryString(sql) + + if err==nil{ + var lst []string + + for no:=0;no< len(result);no++{ + + lst=append(lst,result[no]["datasource_code"]) + } + + return true,lst + }else { + return false,nil + } +} \ No newline at end of file diff --git a/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go b/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go new file mode 100644 index 00000000..c9bd7d38 --- /dev/null +++ b/dsDataex/MyTask/Kafka2ES/Kafka2ESService/Kafka2ESService.go @@ -0,0 +1,132 @@ +package Kafka2ESService + +import ( + "dsDataex/MyTask/Kafka2ES/Kafka2ESDAO" + "dsDataex/Utils/ConfigUtil" + "dsDataex/Utils/KafkaUtil" + "fmt" + "github.com/go-co-op/gocron" + "reflect" + "time" +) + +var ChanTopic chan []string +var LstTopic []string + +func ServiceStart(){ + + cronMan := gocron.NewScheduler(time.UTC) + + cronMan.Every(5).Seconds().StartImmediately().Do(DBWatch) + + cronMan.StartAsync() + + var procNo=int(ConfigUtil.KafkaProcNo) + + KafkaUtil.ChanTopicProc=make(map[string][]chan bool) + KafkaUtil.StateTopicProc=make(map[string][]bool) + + ChanTopic=make(chan []string,100) + + for topics :=range ChanTopic{ + for no :=0;no< len(topics);no++ { + + topic:=topics[no] + + if Contains(LstTopic, topic) == -1 { + + LstTopic = append(LstTopic, topic) + + cronMan.Every(60).Seconds().SetTag([]string{"kafka_" + topic}).StartImmediately().Do(KafkaProcess, topic, procNo) + + //cronMan.StartAsync() + } + } + + if len(LstTopic)> len(topics){ + for no:=0;no< len(LstTopic);no++{ + if Contains(topics,LstTopic[no])==-1{ + + //删除任务 + cronMan.RemoveJobByTag("kafka_"+LstTopic[no]) + + //关闭子线程 + for no2:=0;no2< len(KafkaUtil.ChanTopicProc[LstTopic[no]]);no2++{ + KafkaUtil.ChanTopicProc[LstTopic[no]][no] <- true + } + + delete(KafkaUtil.ChanTopicProc,LstTopic[no]) + delete(KafkaUtil.StateTopicProc,LstTopic[no]) + } + } + + LstTopic=[]string{} + + LstTopic=append(LstTopic,topics...) + } + } +} + +func DBWatch() { + + var _,topics = Kafka2ESDAO.GetTopics() + + ChanTopic <- topics +} + +func KafkaProcess(topic string,procNo int) { + + _,f:=KafkaUtil.ChanTopicProc[topic] + + if f==false{ + + var lst []chan bool + var lst2 []bool + + for no:=0;no %s\n",msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } - - fmt.Printf("message at partiton %d offset %d: %s ==> %s\n",m.Partition, m.Offset, string(m.Key), string(m.Value)) } + StateTopicProc[topic][index]=false + r.Close() + + fmt.Printf("KafkaUtil Consume Stop ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n",topic,group,index) } /** @@ -205,7 +267,7 @@ func Provide(topic string,datas []DataEX.KafkaData)(bool,string){ fmt.Println("Time 9:",time.Now(),",spend:",time.Since(begin)) - err:= w.WriteMessages(CTX ,messages...) + err:= w.WriteMessages( context.Background() ,messages...) fmt.Println("Time 10:",time.Now(),",spend:",time.Since(begin)) //w.Close() @@ -219,12 +281,19 @@ func Provide(topic string,datas []DataEX.KafkaData)(bool,string){ } } +/** + * @Author zhangjun + * @Description 创建 Kafka Topic + * @Date 2020-07-29 02:30 + * @Param + * @return + **/ func CreateTopic(topic string){ - KafkaClient, _ := kafka.DialLeader(CTX , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0) + KafkaClient, _ := kafka.DialLeader( context.Background() , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0) err:= KafkaClient.CreateTopics(kafka.TopicConfig{ - NumPartitions: 8, + NumPartitions: int(ConfigUtil.KafkaParts), ReplicationFactor: int(ConfigUtil.KafkaReply), Topic: topic, }) @@ -234,8 +303,15 @@ func CreateTopic(topic string){ } } +/** + * @Author zhangjun + * @Description 删除Kafka Topic + * @Date 2020-07-29 02:30 + * @Param + * @return + **/ func DeleteTopic(topic string){ - KafkaClient, _ := kafka.DialLeader( CTX, "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) + KafkaClient, _ := kafka.DialLeader( context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) err:= KafkaClient.DeleteTopics(topic) diff --git a/dsDataex/go.mod b/dsDataex/go.mod index 1b829466..8dd08240 100644 --- a/dsDataex/go.mod +++ b/dsDataex/go.mod @@ -17,6 +17,7 @@ require ( github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/garyburd/redigo v1.6.0 github.com/gin-gonic/gin v1.6.3 + github.com/go-co-op/gocron v0.3.0 github.com/go-openapi/loads v0.19.5 github.com/go-openapi/spec v0.19.6 github.com/go-redis/redis/v7 v7.3.0 diff --git a/dsDataex/go.sum b/dsDataex/go.sum index 05bb09f8..05292889 100644 --- a/dsDataex/go.sum +++ b/dsDataex/go.sum @@ -158,6 +158,8 @@ github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= +github.com/go-co-op/gocron v0.3.0 h1:GVNbAB0rrMaP/v1Xs8t2/NzyEG4vP8UbLNy6C22o3RY= +github.com/go-co-op/gocron v0.3.0/go.mod h1:Y9PWlYqDChf2Nbgg7kfS+ZsXHDTZbMZYPEQ0MILqH+M= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= @@ -210,6 +212,8 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87 github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/go-redis/redis v6.15.5+incompatible h1:pLky8I0rgiblWfa8C1EV7fPEUv0aH6vKRaYHc/YRHVk= +github.com/go-redis/redis v6.15.5+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis v6.15.8+incompatible h1:BKZuG6mCnRj5AOaWJXoCgf6rqTYnYJLe4en2hxT7r9o= github.com/go-redis/redis/v7 v7.3.0 h1:3oHqd0W7f/VLKBxeYTEpqdMUsmMectngjM9OtoRoIgg= github.com/go-redis/redis/v7 v7.3.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=