You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

246 lines
4.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package KafkaUtil
import (
"bytes"
"context"
"dsDataex/MyService/DataEX"
"dsDataex/Utils/ConfigUtil"
"encoding/json"
"fmt"
"github.com/segmentio/kafka-go"
"math/rand"
"strconv"
"time"
)
var KafkaBroker string
var CTX context.Context
var writerPool map[string]*kafka.Writer
var kafkaPool map[string]map[int]*kafka.Conn
var kafkaParts map[string]int
func init() {
CTX=context.Background()
KafkaClient, _ := kafka.DialLeader(CTX , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0)
brokers,_:= KafkaClient.Brokers()
KafkaBroker=brokers[0].Host+":"+strconv.Itoa(brokers[0].Port)+"【"+strconv.Itoa(brokers[0].ID)+"】"
writerPool=make(map[string]*kafka.Writer)
kafkaPool = make(map[string]map[int]*kafka.Conn)
kafkaParts= make(map[string]int)
}
/**
* @Author zhangjun
* @Description
* @Date 2020-07-27 05:26
* @Param
* @return
**/
func ProvideLow(topic string,datas []DataEX.KafkaData)(bool,string){
var client *kafka.Conn
clients,flag:=kafkaPool[topic]
if flag==false{
client,_= kafka.DialLeader(CTX, "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
clients=make(map[int]*kafka.Conn)
clients[0]=client
parts,_:=client.ReadPartitions()
offset,_:=client.ReadLastOffset()
if len(parts)== 1 && offset==0 {//初始化 Topic
DeleteTopic(topic)
time.Sleep(100 * time.Millisecond)
CreateTopic(topic)
client, _ = kafka.DialLeader(CTX, "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
clients=make(map[int]*kafka.Conn)
clients[0]=client
parts,_=client.ReadPartitions()
}
if len(parts)>1{//TODO:预先加载 Kafka连接池可能影响性能暂不实现
}
kafkaPool[topic]=clients
kafkaParts[topic]= len(parts)
}else {
max:=kafkaParts[topic]
num:=rand.Intn(max)
c,f:=kafkaPool[topic][num]
if f==true{
client=c
}else {
client, _ = kafka.DialLeader(CTX, "tcp", ConfigUtil.KafkaBrokers[0], topic , num)
kafkaPool[topic][num]=client
}
}
//client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
var messages []kafka.Message
for no:=0;no< len(datas);no++{
var data,_=json.Marshal(datas[no])
var msg=kafka.Message{Value: data }
messages=append(messages,msg)
}
client.SetWriteDeadline(time.Now().Add(5 *time.Second))
_,err:= client.WriteMessages(messages...)
//client.Close()
if err ==nil{
return true,""
}else {
return false,"Kafka数据存储错误"
}
}
func ConsumeLow(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
KafkaClient.SetReadDeadline(time.Now().Add(5 *time.Second))
batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
for {
b := make([]byte, 10e3) // 10KB max per message
_, err := batch.Read(b)
if err != nil {
break
}
index := bytes.IndexByte(b, 0)
fmt.Println(string(b[0:index]))
}
batch.Close()
//KafkaClient.Close()
}
func Consume(topic string,group string) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: ConfigUtil.KafkaBrokers ,
Topic: topic,
//Partition: 0,
GroupID: group,//必须指定 Group否则需要指定 Partition
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
for {
m, err := r.ReadMessage( CTX )
if err != nil {
break
}
fmt.Printf("message at partiton %d offset %d: %s ==> %s\n",m.Partition, m.Offset, string(m.Key), string(m.Value))
}
r.Close()
}
/**
* @Author zhangjun
* @Description
* @Date 2020-07-27 11:13
* @Param
* @return
* TODO:高阶接口调用速度太慢1s不采用
**/
func Provide(topic string,datas []DataEX.KafkaData)(bool,string){
var begin =time.Now()
w,f:= writerPool[topic]
if f==false{
w = kafka.NewWriter(kafka.WriterConfig{
Brokers: ConfigUtil.KafkaBrokers,
Topic: topic,
Balancer: &kafka.Hash{},//.RoundRobin{},//.LeastBytes{},
})
writerPool[topic]=w
}
var messages []kafka.Message
for no:=0;no< len(datas);no++{
var data,_=json.Marshal(datas[no])
var msg=kafka.Message{Value: data }
messages=append(messages,msg)
}
fmt.Println("Time 9:",time.Now(),",spend:",time.Since(begin))
err:= w.WriteMessages(CTX ,messages...)
fmt.Println("Time 10:",time.Now(),",spend:",time.Since(begin))
//w.Close()
if err ==nil{
return true,""
}else {
return false,"Kafka数据存储错误"
}
}
func CreateTopic(topic string){
KafkaClient, _ := kafka.DialLeader(CTX , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0)
err:= KafkaClient.CreateTopics(kafka.TopicConfig{
NumPartitions: 8,
ReplicationFactor: int(ConfigUtil.KafkaReply),
Topic: topic,
})
if err != nil{
fmt.Println(err.Error())
}
}
func DeleteTopic(topic string){
KafkaClient, _ := kafka.DialLeader( CTX, "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
err:= KafkaClient.DeleteTopics(topic)
if err != nil{
fmt.Println(err.Error())
}
}