package Kafka2ESService import ( "dsDataex/MyTask/Kafka2ES/Kafka2ESDAO" "dsDataex/Utils/ES7Util" "dsDataex/Utils/KafkaUtil" "fmt" "github.com/go-co-op/gocron" "reflect" "sync" "time" ) var ChanTopic chan []string //var LstTopic []string var GROUP_NO = 0 var loc sync.Mutex /** * @Author zhangjun * @Description Kafka 2 ES 数据处理服务启动 * @Date 2020-08-04 09:59 * @Param * @return **/ func ServiceStart() { cronMan := gocron.NewScheduler(time.UTC) cronMan.Every(5).Seconds().StartImmediately().Do(DBWatchProcess) cronMan.Every(10).Seconds().Do(LogProcess) cronMan.Every(60).Seconds().Do(ESRefreshProcess) cronMan.StartAsync() defer func() { if err := recover(); err != nil { fmt.Println("Kafka2ESService Panic Recover :", err) cronMan.Stop() } }() //var procNo = int(ConfigUtil.KafkaProcNo) //KafkaUtil.ChanTopicProc = make(map[string]chan bool) //KafkaUtil.StateTopicProc = make(map[string]bool) KafkaUtil.CountTopicProc = make(map[string]int) ChanTopic = make(chan []string, 100) for topics := range ChanTopic { for no := 0; no < len(topics); no++ { topic := topics[no] _, f := KafkaUtil.CountTopicProc[topic] if f == false { //change by zhangjun 2020-08-02 //cronMan.Every(60).Seconds().SetTag([]string{"kafka_" + topic}).StartImmediately().Do(KafkaProcess, topic) KafkaProcess(topic) //time.Sleep(time.Second * 1) } } if len(KafkaUtil.CountTopicProc) > len(topics) { for k, _ := range KafkaUtil.CountTopicProc { if Contains(topics, k) == -1 { //删除任务 //cronMan.RemoveJobByTag("kafka_" + LstTopic[no]) //关闭子线程 //KafkaUtil.ChanTopicProc[LstTopic[no]] <- true loc.Lock() //delete(KafkaUtil.ChanTopicProc, LstTopic[no]) //delete(KafkaUtil.StateTopicProc, k) delete(KafkaUtil.CountTopicProc, k) loc.Unlock() } } //LstTopic = []string{} // //LstTopic = append(LstTopic, topics...) } } } /** * @Author zhangjun * @Description 监测 mysql数据、数据源 * @Date 2020-08-04 09:59 * @Param * @return **/ func DBWatchProcess() { var _, topics = Kafka2ESDAO.GetTopics() ChanTopic <- topics } /** * @Author zhangjun * @Description Consume Kafka数据 * @Date 2020-08-04 09:59 * @Param * @return **/ func KafkaProcess(topic string) { _, f := KafkaUtil.CountTopicProc[topic] if f == false { //loc.Lock() //KafkaUtil.ChanTopicProc[topic] = nil //KafkaUtil.StateTopicProc[topic] = true KafkaUtil.CountTopicProc[topic] = 0 //loc.Unlock() if GROUP_NO == 0 { fmt.Printf("Dataex Kafka2ES Process Start,Topic:%s,ConsumerGroup:%s.\n", topic, "group_"+topic) go KafkaUtil.Consume(topic, "group_"+topic) } else { //add by zhangjun 2020-08-04 //开启双 Consume Group 线程处理,确保数据准确!!! //time.Sleep(time.Second * 5) fmt.Printf("Dataex Kafka2ES Process Start,Topic:%s,ConsumerGroup:%s.\n", topic, "group2_"+topic) go KafkaUtil.Consume(topic, "group2_"+topic) } }// else { //TODO:处理异常子线程!!! // // if KafkaUtil.CountTopicProc[topic] == false { // // loc.Lock() // // //KafkaUtil.ChanTopicProc[topic] = nil // //KafkaUtil.StateTopicProc[topic] = true // KafkaUtil.CountTopicProc[topic] = 0 // // loc.Unlock() // // if GROUP_NO == 0 { // fmt.Printf("Dataex Kafka2ES Process Start,Topic:%s,ConsumerGroup:%s.\n", topic, "group_"+topic) // go KafkaUtil.Consume(topic, "group_"+topic) // } else { // fmt.Printf("Dataex Kafka2ES Process Start,Topic:%s,ConsumerGroup:%s.\n", topic, "group2_"+topic) // go KafkaUtil.Consume(topic, "group2_"+topic) // } // //time.Sleep(time.Second * 10) // } //} } /** * @Author zhangjun * @Description 打印 Kafka 数据处理进度 * @Date 2020-08-04 09:59 * @Param * @return **/ func LogProcess() { for k, v := range KafkaUtil.CountTopicProc { fmt.Println("[Kafka] ["+k+"] "+time.Now().Format("2006/01/02 15:04:05")+" Process message total:", v) } } /** * @Author zhangjun * @Description 定期刷新ES 索引 * @Date 2020-08-04 09:59 * @Param * @return **/ func ESRefreshProcess() { for k,_:=range KafkaUtil.CountTopicProc { ES7Util.IndexRefresh( k ) } } func Contains(array interface{}, val interface{}) (index int) { index = -1 switch reflect.TypeOf(array).Kind() { case reflect.Slice: { s := reflect.ValueOf(array) for i := 0; i < s.Len(); i++ { if reflect.DeepEqual(val, s.Index(i).Interface()) { index = i return index } } } } return index }