You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

370 lines
8.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package KafkaUtil
import (
"bytes"
"context"
"dsDataex/MyService/DataEX"
"dsDataex/MyTask/Kafka2ES/Kafka2ESTask"
"dsDataex/Utils/ConfigUtil"
"encoding/json"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/snappy"
"math/rand"
"strconv"
"time"
)
var KafkaBroker string
var writerPool map[string]*kafka.Writer
var kafkaPool map[string]map[int]*kafka.Conn
var kafkaParts map[string]int
//控制 consume 子线程 关闭
var ChanTopicProc map[string][]chan bool
//记录 consume 子线程状态 ( 10 分钟内是否成功执行 ReadMessage不执行自动关闭子线程 )
var StateTopicProc map[string][]bool
var CountTopicProc map[string][]int
func init() {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
brokers, _ := KafkaClient.Brokers()
KafkaBroker = brokers[0].Host + ":" + strconv.Itoa(brokers[0].Port) + "【" + strconv.Itoa(brokers[0].ID) + "】"
writerPool = make(map[string]*kafka.Writer)
kafkaPool = make(map[string]map[int]*kafka.Conn)
kafkaParts = make(map[string]int)
}
/**
* @Author zhangjun
* @Description 发送 Kafka 消息,如果 Topic为空创建 Topic
* @Date 2020-07-27 05:26
* @Param
* @return
**/
func ProvideLow(topic string, datas []DataEX.KafkaData) (bool, string) {
var client *kafka.Conn
clients, flag := kafkaPool[topic]
if flag == false {
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
clients = make(map[int]*kafka.Conn)
clients[0] = client
parts, _ := client.ReadPartitions()
offset, _ := client.ReadLastOffset()
if len(parts) == 1 && offset == 0 { //初始化 Topic
DeleteTopic(topic)
time.Sleep(100 * time.Millisecond)
CreateTopic(topic)
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
clients = make(map[int]*kafka.Conn)
clients[0] = client
parts, _ = client.ReadPartitions()
}
if len(parts) > 1 { //TODO:预先加载 Kafka连接池可能影响性能暂不实现
}
kafkaPool[topic] = clients
kafkaParts[topic] = len(parts)
} else {
max := kafkaParts[topic]
num := rand.Intn(max)
c, f := kafkaPool[topic][num]
if f == true {
client = c
//add by zhangjun 2020-08-02 判断链接超时,异常关闭
_, err:= client.ReadLastOffset()
if err!=nil{
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num)
kafkaPool[topic][num] = client
}
} else {
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num)
kafkaPool[topic][num] = client
}
}
//client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
var messages []kafka.Message
for no := 0; no < len(datas); no++ {
var data, _ = json.Marshal(datas[no])
var msg = kafka.Message{Value: data}
messages = append(messages, msg)
}
client.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err := client.WriteMessages(messages...)
//client.Close()
if err == nil {
return true, ""
} else {
fmt.Println("Kafka数据存储失败 :",err.Error())
return false, "Kafka数据存储失败"
}
}
/**
* @Author zhangjun
* @Description
* @Date 2020-07-29 02:28
* @Param
* @return TODO:低阶接口调用不支持 ConsumerGroup需要自己实现 ,复杂度太高 ,不采用!!!
**/
func ConsumeLow(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
KafkaClient.SetReadDeadline(time.Now().Add(5 * time.Second))
batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
for {
b := make([]byte, 10e3) // 10KB max per message
_, err := batch.Read(b)
if err != nil {
break
}
index := bytes.IndexByte(b, 0)
fmt.Println(string(b[0:index]))
}
batch.Close()
//KafkaClient.Close()
}
/**
* @Author zhangjun
* @Description 获取 Kafka 消息GoRoutine并发 线程阻塞式调用
* @Date 2020-07-29 02:31
* @Param
* @return
**/
func Consume(topic string, group string, index int) {
defer func() {
if err := recover(); err != nil {
fmt.Println("KafkaUtil Consume Panic Recover :", err)
StateTopicProc[topic][index] = false
}
}()
//add by zhangjun 2020-08-03
//Kafka-Sarama 解析压缩消息
snappy.NewCompressionCodec()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: ConfigUtil.KafkaBrokers,
Topic: topic,
//Partition: 0,
GroupID: group, //必须指定 Group否则需要指定 Partition
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
//ticker :=time.NewTicker( 10 * time.Second)
//count:=0
myLoop:
for {
//10分钟无法ReadMessage ,关闭 Consume 线程
ctx, cancle := context.WithTimeout(context.Background(), time.Minute*10)
defer cancle()
select {
case f := <-ChanTopicProc[topic][index]:
if f == true {
r.Close()
StateTopicProc[topic][index] = false
fmt.Printf("Dataex Kafka2ES Process Close ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index)
return
}
//case <- ticker.C:
// if count==0{
// r.Close()
//
// StateTopicProc[topic][index]=false
// return
// }else {
// count=0
// }
default:
msg, err := r.ReadMessage( ctx )
if err != nil {
fmt.Println("KafkaUtil ReadMessage Error :",err.Error())
break myLoop
}
//TODO:此处不能使用 go否则速度太快数据会丢失以后可以考虑优化ES集群
Kafka2ESTask.Process(topic, msg)
//count++
CountTopicProc[topic][index] ++
//fmt.Printf("message at partiton %d offset %d: %s ==> %s\n",msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}
StateTopicProc[topic][index] = false
r.Close()
fmt.Printf("Dataex Kafka2ES Process Stop ,Topic:%s,ConsumerGroup:%s,GoroutineNO:%d.\n", topic, group, index)
}
/**
* @Author zhangjun
* @Description
* @Date 2020-07-27 11:13
* @Param
* @return
* TODO:高阶接口调用速度太慢1s不采用
**/
func Provide(topic string, datas []DataEX.KafkaData) (bool, string) {
var begin = time.Now()
w, f := writerPool[topic]
if f == false {
w = kafka.NewWriter(kafka.WriterConfig{
Brokers: ConfigUtil.KafkaBrokers,
Topic: topic,
Balancer: &kafka.Hash{}, //.RoundRobin{},//.LeastBytes{},
})
writerPool[topic] = w
}
var messages []kafka.Message
for no := 0; no < len(datas); no++ {
var data, _ = json.Marshal(datas[no])
var msg = kafka.Message{Value: data}
messages = append(messages, msg)
}
fmt.Println("Time 9:", time.Now(), ",spend:", time.Since(begin))
err := w.WriteMessages(context.Background(), messages...)
fmt.Println("Time 10:", time.Now(), ",spend:", time.Since(begin))
//w.Close()
if err == nil {
return true, ""
} else {
return false, "Kafka数据存储错误"
}
}
/**
* @Author zhangjun
* @Description
* @Date 2020-08-03 08:47
* @Param
* @return
**/
func InitTopic(topic string) {
client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
parts, _ := client.ReadPartitions()
offset, _ := client.ReadLastOffset()
if len(parts) == 1 && offset == 0 {
DeleteTopic(topic)
time.Sleep(100 * time.Millisecond)
CreateTopic(topic)
}
}
/**
* @Author zhangjun
* @Description 创建 Kafka Topic
* @Date 2020-07-29 02:30
* @Param
* @return
**/
func CreateTopic(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
err := KafkaClient.CreateTopics(kafka.TopicConfig{
NumPartitions: int(ConfigUtil.KafkaParts),
ReplicationFactor: int(ConfigUtil.KafkaReply),
Topic: topic,
})
if err != nil {
fmt.Println(err.Error())
}
}
/**
* @Author zhangjun
* @Description 删除Kafka Topic
* @Date 2020-07-29 02:30
* @Param
* @return
**/
func DeleteTopic(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
err := KafkaClient.DeleteTopics(topic)
if err != nil {
fmt.Println(err.Error())
}
}