You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

381 lines
8.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package KafkaUtil
import (
"bytes"
"context"
"dsDataex/MyService/DataEX"
"dsDataex/MyTask/Kafka2ES/Kafka2ESTask"
"dsDataex/Utils/ConfigUtil"
"encoding/json"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/snappy"
"math/rand"
"strconv"
"sync"
"time"
)
var KafkaBroker string
var writerPool map[string]*kafka.Writer
var kafkaPool map[string]map[int]*kafka.Conn
var kafkaParts map[string]int
//控制 consume 子线程 关闭
//var ChanTopicProc map[string]chan bool
var loc sync.Mutex
//记录 consume 子线程状态 ( 10 分钟内是否成功执行 ReadMessage不执行自动关闭子线程 )
//var StateTopicProc map[string]bool
var CountTopicProc map[string]int
func init() {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
brokers, _ := KafkaClient.Brokers()
KafkaBroker = brokers[0].Host + ":" + strconv.Itoa(brokers[0].Port) + "【" + strconv.Itoa(brokers[0].ID) + "】"
writerPool = make(map[string]*kafka.Writer)
kafkaPool = make(map[string]map[int]*kafka.Conn)
kafkaParts = make(map[string]int)
}
/**
* @Author zhangjun
* @Description 发送 Kafka 消息,如果 Topic为空创建 Topic
* @Date 2020-07-27 05:26
* @Param
* @return
**/
func ProvideLow(topic string, datas []DataEX.KafkaData) (bool, string) {
var client *kafka.Conn
clients, flag := kafkaPool[topic]
if flag == false {
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
clients = make(map[int]*kafka.Conn)
clients[0] = client
parts, _ := client.ReadPartitions()
offset, _ := client.ReadLastOffset()
if len(parts) == 1 && offset == 0 { //初始化 Topic
DeleteTopic(topic)
time.Sleep(100 * time.Millisecond)
CreateTopic(topic)
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
clients = make(map[int]*kafka.Conn)
clients[0] = client
parts, _ = client.ReadPartitions()
}
if len(parts) > 1 { //TODO:预先加载 Kafka连接池可能影响性能暂不实现
}
kafkaPool[topic] = clients
kafkaParts[topic] = len(parts)
} else {
max := kafkaParts[topic]
num := rand.Intn(max)
c, f := kafkaPool[topic][num]
if f == true {
client = c
//add by zhangjun 2020-08-02 判断链接超时,异常关闭
_, err := client.ReadLastOffset()
if err != nil {
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num)
kafkaPool[topic][num] = client
}
} else {
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num)
kafkaPool[topic][num] = client
}
}
//client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
var messages []kafka.Message
for no := 0; no < len(datas); no++ {
var data, _ = json.Marshal(datas[no])
var msg = kafka.Message{Value: data}
messages = append(messages, msg)
}
client.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err := client.WriteMessages(messages...)
//client.Close()
if err == nil {
return true, ""
} else {
fmt.Println("Kafka数据存储失败 :", err.Error())
return false, "Kafka数据存储失败"
}
}
/**
* @Author zhangjun
* @Description
* @Date 2020-07-29 02:28
* @Param
* @return TODO:低阶接口调用不支持 ConsumerGroup需要自己实现 ,复杂度太高 ,不采用!!!
**/
func ConsumeLow(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
KafkaClient.SetReadDeadline(time.Now().Add(5 * time.Second))
batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
for {
b := make([]byte, 10e3) // 10KB max per message
_, err := batch.Read(b)
if err != nil {
break
}
index := bytes.IndexByte(b, 0)
fmt.Println(string(b[0:index]))
}
batch.Close()
//KafkaClient.Close()
}
/**
* @Author zhangjun
* @Description 获取 Kafka 消息GoRoutine并发 线程阻塞式调用
* @Date 2020-07-29 02:31
* @Param
* @return
**/
func Consume(topic string, group string) {
defer func() {
if err := recover(); err != nil {
fmt.Println("KafkaUtil Consume Panic Recover :", err)
}
delete(CountTopicProc, topic)
}()
//add by zhangjun 2020-08-03
//Kafka-Sarama 解析压缩消息
snappy.NewCompressionCodec()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: ConfigUtil.KafkaBrokers,
Topic: topic,
//Partition: 0,
GroupID: group, //必须指定 Group否则需要指定 Partition
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
//CommitInterval: time.Second,// flushes commits to Kafka every second
})
//ticker :=time.NewTicker( 10 * time.Second)
//count:=0
//myLoop:
for {
//10分钟无法ReadMessage ,关闭 Consume 线程
ctx, cancle := context.WithTimeout(context.Background(), time.Second * 600)
defer cancle()
//select {
//case f := <-ChanTopicProc[topic]:
// if f == true {
//
// loc.Lock()
// StateTopicProc[topic] = false
// loc.Unlock()
//
// r.Close()
// fmt.Printf("Dataex Kafka2ES Process Close ,Topic:%s,ConsumerGroup:%s.\n", topic, group)
//
// return
// }
//case <- ticker.C:
// if count==0{
// r.Close()
//
// StateTopicProc[topic][index]=false
// return
// }else {
// count=0
// }
//default:
//阻塞读取 Kafka
msg, err := r.ReadMessage(ctx)
//fmt.Println("KafkaUtil ReadMessage :", topic , msg.Offset)
if err != nil {
fmt.Println("KafkaUtil ReadMessage Error :", err.Error())
break
}
//TODO:此处不能使用 go否则速度太快数据会丢失以后可以考虑优化ES集群
Kafka2ESTask.Process(topic, msg)
_, f := CountTopicProc[topic]
if f == false {
break
} else {
loc.Lock()
//count++
CountTopicProc[topic]++
loc.Unlock()
//fmt.Printf("message at partiton %d offset %d: %s ==> %s\n",msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}
r.Close()
fmt.Printf("Dataex Kafka2ES Process Stop ,Topic:%s,ConsumerGroup:%s.\n", topic, group)
}
/**
* @Author zhangjun
* @Description
* @Date 2020-07-27 11:13
* @Param
* @return
* TODO:高阶接口调用速度太慢1s不采用
**/
func Provide(topic string, datas []DataEX.KafkaData) (bool, string) {
var begin = time.Now()
w, f := writerPool[topic]
if f == false {
w = kafka.NewWriter(kafka.WriterConfig{
Brokers: ConfigUtil.KafkaBrokers,
Topic: topic,
Balancer: &kafka.Hash{}, //.RoundRobin{},//.LeastBytes{},
})
writerPool[topic] = w
}
var messages []kafka.Message
for no := 0; no < len(datas); no++ {
var data, _ = json.Marshal(datas[no])
var msg = kafka.Message{Value: data}
messages = append(messages, msg)
}
fmt.Println("Time 9:", time.Now(), ",spend:", time.Since(begin))
err := w.WriteMessages(context.Background(), messages...)
fmt.Println("Time 10:", time.Now(), ",spend:", time.Since(begin))
//w.Close()
if err == nil {
return true, ""
} else {
return false, "Kafka数据存储错误"
}
}
/**
* @Author zhangjun
* @Description
* @Date 2020-08-03 08:47
* @Param
* @return
**/
func InitTopic(topic string) {
client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
parts, _ := client.ReadPartitions()
offset, _ := client.ReadLastOffset()
if len(parts) == 1 && offset == 0 {
DeleteTopic(topic)
time.Sleep(100 * time.Millisecond)
CreateTopic(topic)
}
}
/**
* @Author zhangjun
* @Description 创建 Kafka Topic
* @Date 2020-07-29 02:30
* @Param
* @return
**/
func CreateTopic(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
err := KafkaClient.CreateTopics(kafka.TopicConfig{
NumPartitions: int(ConfigUtil.KafkaParts),
ReplicationFactor: int(ConfigUtil.KafkaReply),
Topic: topic,
})
if err != nil {
fmt.Println(err.Error())
}
}
/**
* @Author zhangjun
* @Description 删除Kafka Topic
* @Date 2020-07-29 02:30
* @Param
* @return
**/
func DeleteTopic(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
err := KafkaClient.DeleteTopics(topic)
if err != nil {
fmt.Println(err.Error())
}
}