You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
1.7 KiB

package KafkaUtil
import (
"context"
"dsBaseWeb/Utils/ConfigUtil"
"fmt"
"github.com/segmentio/kafka-go"
"strconv"
"time"
)
var KafkaBroker string
func init() {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
brokers, _ := KafkaClient.Brokers()
KafkaBroker = brokers[0].Host + ":" + strconv.Itoa(brokers[0].Port) + "【" + strconv.Itoa(brokers[0].ID) + "】"
}
/**
* @Author zhangjun
* @Description
* @Date 2020-08-03 08:47
* @Param
* @return
**/
func InitTopic(topic string) {
client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
parts, _ := client.ReadPartitions()
offset, _ := client.ReadLastOffset()
if len(parts) == 1 && offset == 0 {
DeleteTopic(topic)
time.Sleep(100 * time.Millisecond)
CreateTopic(topic)
}
}
/**
* @Author zhangjun
* @Description 创建 Kafka Topic
* @Date 2020-07-29 02:30
* @Param
* @return
**/
func CreateTopic(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
err := KafkaClient.CreateTopics(kafka.TopicConfig{
NumPartitions: int(ConfigUtil.KafkaParts),
ReplicationFactor: int(ConfigUtil.KafkaReply),
Topic: topic,
})
if err != nil {
fmt.Println(err.Error())
}
}
/**
* @Author zhangjun
* @Description 删除Kafka Topic
* @Date 2020-07-29 02:30
* @Param
* @return
**/
func DeleteTopic(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
err := KafkaClient.DeleteTopics(topic)
if err != nil {
fmt.Println(err.Error())
}
}