parent
d83de5e2b5
commit
d648e06ff4
@ -0,0 +1,87 @@
|
||||
package KafkaUtil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"dsBaseWeb/Utils/ConfigUtil"
|
||||
"fmt"
|
||||
"github.com/segmentio/kafka-go"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
var KafkaBroker string
|
||||
|
||||
|
||||
|
||||
func init() {
|
||||
|
||||
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
|
||||
|
||||
brokers, _ := KafkaClient.Brokers()
|
||||
KafkaBroker = brokers[0].Host + ":" + strconv.Itoa(brokers[0].Port) + "【" + strconv.Itoa(brokers[0].ID) + "】"
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @Author zhangjun
|
||||
* @Description
|
||||
* @Date 2020-08-03 08:47
|
||||
* @Param
|
||||
* @return
|
||||
**/
|
||||
func InitTopic(topic string) {
|
||||
client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
|
||||
|
||||
parts, _ := client.ReadPartitions()
|
||||
offset, _ := client.ReadLastOffset()
|
||||
|
||||
if len(parts) == 1 && offset == 0 {
|
||||
|
||||
DeleteTopic(topic)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
CreateTopic(topic)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @Author zhangjun
|
||||
* @Description 创建 Kafka Topic
|
||||
* @Date 2020-07-29 02:30
|
||||
* @Param
|
||||
* @return
|
||||
**/
|
||||
func CreateTopic(topic string) {
|
||||
|
||||
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
|
||||
|
||||
err := KafkaClient.CreateTopics(kafka.TopicConfig{
|
||||
NumPartitions: int(ConfigUtil.KafkaParts),
|
||||
ReplicationFactor: int(ConfigUtil.KafkaReply),
|
||||
Topic: topic,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @Author zhangjun
|
||||
* @Description 删除Kafka Topic
|
||||
* @Date 2020-07-29 02:30
|
||||
* @Param
|
||||
* @return
|
||||
**/
|
||||
func DeleteTopic(topic string) {
|
||||
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
|
||||
|
||||
err := KafkaClient.DeleteTopics(topic)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in new issue