package Kafka2ESService import ( "dsDataex/MyTask/Kafka2ES/Kafka2ESDAO" "dsDataex/Utils/KafkaUtil" "fmt" "github.com/go-co-op/gocron" "reflect" "sync" "time" ) var ChanTopic chan []string var LstTopic []string var loc sync.Mutex func ServiceStart() { cronMan := gocron.NewScheduler(time.UTC) cronMan.Every(5).Seconds().StartImmediately().Do(DBWatch) cronMan.Every(10).Seconds().Do(LogProcess) cronMan.StartAsync() defer func() { if err := recover(); err != nil { fmt.Println("Kafka2ESService Panic Recover :", err) cronMan.Stop() } }() //var procNo = int(ConfigUtil.KafkaProcNo) KafkaUtil.ChanTopicProc = make(map[string] chan bool) KafkaUtil.StateTopicProc = make(map[string] bool) KafkaUtil.CountTopicProc = make(map[string] int) ChanTopic = make(chan []string, 100) for topics := range ChanTopic { for no := 0; no < len(topics); no++ { topic := topics[no] if Contains(LstTopic, topic) == -1 { LstTopic = append(LstTopic, topic) //change by zhangjun 2020-08-02 cronMan.Every(60).Seconds().SetTag([]string{"kafka_" + topic}).StartImmediately().Do(KafkaProcess, topic) //go KafkaProcess(topic, procNo) } } if len(LstTopic) > len(topics) { for no := 0; no < len(LstTopic); no++ { if Contains(topics, LstTopic[no]) == -1 { //删除任务 cronMan.RemoveJobByTag("kafka_" + LstTopic[no]) //关闭子线程 //for no2 := 0; no2 < len(KafkaUtil.ChanTopicProc[LstTopic[no]]); no2++ { KafkaUtil.ChanTopicProc[LstTopic[no]] <- true //} delete(KafkaUtil.ChanTopicProc, LstTopic[no]) delete(KafkaUtil.StateTopicProc, LstTopic[no]) } } LstTopic = []string{} LstTopic = append(LstTopic, topics...) } } } func DBWatch() { var _, topics = Kafka2ESDAO.GetTopics() ChanTopic <- topics } func KafkaProcess(topic string) { _, f := KafkaUtil.ChanTopicProc[topic] if f == false { //var lst []chan bool //var lst2 []bool //var lst3 []int // //for no := 0; no < procNo; no++ { // // var chanProc = make(chan bool, 100) // // lst = append(lst, chanProc) // lst2 = append(lst2, true) // lst3 = append(lst3, 0) //} //add by zhangjun 2020-07-30 loc.Lock() KafkaUtil.ChanTopicProc[topic] = nil KafkaUtil.StateTopicProc[topic] = true KafkaUtil.CountTopicProc[topic] = 0 loc.Unlock() //for no := 0; no < procNo; no++ { fmt.Printf("Dataex Kafka2ES Process Start,Topic:%s,ConsumerGroup:%s.\n", topic, "group_"+topic) //开启子线程 go KafkaUtil.Consume(topic, "group_"+topic) //time.Sleep(time.Second * 10) //} } else { //TODO:处理异常子线程!!! //for no := 0; no < len(KafkaUtil.StateTopicProc[topic]); no++ { if KafkaUtil.StateTopicProc[topic] == false { fmt.Printf("Dataex Kafka2ES Process Start,Topic:%s,ConsumerGroup:%s.\n", topic, "group_"+topic) KafkaUtil.StateTopicProc[topic] = true KafkaUtil.CountTopicProc[topic] = 0 go KafkaUtil.Consume(topic, "group_"+topic) time.Sleep(time.Second * 10) } //} } } func LogProcess() { for k,v:= range KafkaUtil.CountTopicProc{ //if len(KafkaUtil.CountTopicProc[k])>0{ //for no:=0;no< len(v);no++{ fmt.Println("[Kafka] ["+k+"] "+time.Now().Format("2006/01/02 15:04:05")+" Process message total:",v) //} //} } } func Contains(array interface{}, val interface{}) (index int) { index = -1 switch reflect.TypeOf(array).Kind() { case reflect.Slice: { s := reflect.ValueOf(array) for i := 0; i < s.Len(); i++ { if reflect.DeepEqual(val, s.Index(i).Interface()) { index = i return index } } } } return index }