package KafkaUtil import ( "bytes" "context" "dsDataex/MyService/DataEX" "dsDataex/MyTask/Kafka2ES/Kafka2ESTask" "dsDataex/Utils/ConfigUtil" "encoding/json" "fmt" "github.com/segmentio/kafka-go" "math/rand" "strconv" "time" ) var KafkaBroker string var writerPool map[string]*kafka.Writer var kafkaPool map[string]map[int]*kafka.Conn var kafkaParts map[string]int //控制 consume 子线程 关闭 var ChanTopicProc map[string][]chan bool //记录 consume 子线程状态 ( 10 分钟内是否成功执行 ReadMessage,不执行自动关闭子线程 ) var StateTopicProc map[string][]bool func init() { KafkaClient, _ := kafka.DialLeader(context.Background() , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0) brokers,_:= KafkaClient.Brokers() KafkaBroker=brokers[0].Host+":"+strconv.Itoa(brokers[0].Port)+"【"+strconv.Itoa(brokers[0].ID)+"】" writerPool=make(map[string]*kafka.Writer) kafkaPool = make(map[string]map[int]*kafka.Conn) kafkaParts= make(map[string]int) } /** * @Author zhangjun * @Description 发送 Kafka 消息,如果 Topic为空,创建 Topic * @Date 2020-07-27 05:26 * @Param * @return **/ func ProvideLow(topic string,datas []DataEX.KafkaData)(bool,string){ var client *kafka.Conn clients,flag:=kafkaPool[topic] if flag==false{ client,_= kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) clients=make(map[int]*kafka.Conn) clients[0]=client parts,_:=client.ReadPartitions() offset,_:=client.ReadLastOffset() if len(parts)== 1 && offset==0 {//初始化 Topic DeleteTopic(topic) time.Sleep(100 * time.Millisecond) CreateTopic(topic) client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) clients=make(map[int]*kafka.Conn) clients[0]=client parts,_=client.ReadPartitions() } if len(parts)>1{//TODO:预先加载 Kafka连接池,可能影响性能,暂不实现!!! } kafkaPool[topic]=clients kafkaParts[topic]= len(parts) }else { max:=kafkaParts[topic] num:=rand.Intn(max) c,f:=kafkaPool[topic][num] if f==true{ client=c }else { client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , num) kafkaPool[topic][num]=client } } //client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) var messages []kafka.Message for no:=0;no< len(datas);no++{ var data,_=json.Marshal(datas[no]) var msg=kafka.Message{Value: data } messages=append(messages,msg) } client.SetWriteDeadline(time.Now().Add(5 *time.Second)) _,err:= client.WriteMessages(messages...) //client.Close() if err ==nil{ return true,"" }else { return false,"Kafka数据存储错误" } } /** * @Author zhangjun * @Description * @Date 2020-07-29 02:28 * @Param * @return TODO:低阶接口调用不支持 ConsumerGroup,需要自己实现 ,复杂度太高 ,不采用!!! **/ func ConsumeLow(topic string) { KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) KafkaClient.SetReadDeadline(time.Now().Add(5 *time.Second)) batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max for { b := make([]byte, 10e3) // 10KB max per message _, err := batch.Read(b) if err != nil { break } index := bytes.IndexByte(b, 0) fmt.Println(string(b[0:index])) } batch.Close() //KafkaClient.Close() } /** * @Author zhangjun * @Description 获取 Kafka 消息,GoRoutine并发, 线程阻塞式调用 !!! * @Date 2020-07-29 02:31 * @Param * @return **/ func Consume(topic string,group string,index int) { defer func(){ if err:=recover();err!=nil{ fmt.Println("KafkaUtil Consume Panic Recover :",err) StateTopicProc[topic][index]=false } }() r := kafka.NewReader(kafka.ReaderConfig{ Brokers: ConfigUtil.KafkaBrokers , Topic: topic, //Partition: 0, GroupID: group,//必须指定 Group,否则需要指定 Partition!!! MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) //ticker :=time.NewTicker( 10 * time.Second) //count:=0 myLoop: for { //10分钟无法ReadMessage ,关闭 Consume 线程 ctx,cancle :=context.WithTimeout( context.Background(),time.Minute * 10 ) defer cancle() select { case f := <- ChanTopicProc[topic][index]: if f == true { r.Close() StateTopicProc[topic][index]=false return } //case <- ticker.C: // if count==0{ // r.Close() // // StateTopicProc[topic][index]=false // return // }else { // count=0 // } default: msg, err := r.ReadMessage(ctx) if err != nil { //fmt.Println("KafkaUtil ReadMessage Error :",err.Error()) break myLoop } //count++ Kafka2ESTask.Process(topic, msg) //fmt.Printf("message at partiton %d offset %d: %s ==> %s\n",msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } } StateTopicProc[topic][index]=false r.Close() fmt.Printf("KafkaUtil Consume Stop ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n",topic,group,index) } /** * @Author zhangjun * @Description * @Date 2020-07-27 11:13 * @Param * @return * TODO:高阶接口调用速度太慢,1s,不采用!!! **/ func Provide(topic string,datas []DataEX.KafkaData)(bool,string){ var begin =time.Now() w,f:= writerPool[topic] if f==false{ w = kafka.NewWriter(kafka.WriterConfig{ Brokers: ConfigUtil.KafkaBrokers, Topic: topic, Balancer: &kafka.Hash{},//.RoundRobin{},//.LeastBytes{}, }) writerPool[topic]=w } var messages []kafka.Message for no:=0;no< len(datas);no++{ var data,_=json.Marshal(datas[no]) var msg=kafka.Message{Value: data } messages=append(messages,msg) } fmt.Println("Time 9:",time.Now(),",spend:",time.Since(begin)) err:= w.WriteMessages( context.Background() ,messages...) fmt.Println("Time 10:",time.Now(),",spend:",time.Since(begin)) //w.Close() if err ==nil{ return true,"" }else { return false,"Kafka数据存储错误" } } /** * @Author zhangjun * @Description 创建 Kafka Topic * @Date 2020-07-29 02:30 * @Param * @return **/ func CreateTopic(topic string){ KafkaClient, _ := kafka.DialLeader( context.Background() , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0) err:= KafkaClient.CreateTopics(kafka.TopicConfig{ NumPartitions: int(ConfigUtil.KafkaParts), ReplicationFactor: int(ConfigUtil.KafkaReply), Topic: topic, }) if err != nil{ fmt.Println(err.Error()) } } /** * @Author zhangjun * @Description 删除Kafka Topic * @Date 2020-07-29 02:30 * @Param * @return **/ func DeleteTopic(topic string){ KafkaClient, _ := kafka.DialLeader( context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) err:= KafkaClient.DeleteTopics(topic) if err != nil{ fmt.Println(err.Error()) } }