You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
88 lines
1.7 KiB
88 lines
1.7 KiB
package KafkaUtil
|
|
|
|
import (
|
|
"context"
|
|
"dsBaseWeb/Utils/ConfigUtil"
|
|
"fmt"
|
|
"github.com/segmentio/kafka-go"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
var KafkaBroker string
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
|
|
|
|
brokers, _ := KafkaClient.Brokers()
|
|
KafkaBroker = brokers[0].Host + ":" + strconv.Itoa(brokers[0].Port) + "【" + strconv.Itoa(brokers[0].ID) + "】"
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
* @Author zhangjun
|
|
* @Description
|
|
* @Date 2020-08-03 08:47
|
|
* @Param
|
|
* @return
|
|
**/
|
|
func InitTopic(topic string) {
|
|
client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
|
|
|
|
parts, _ := client.ReadPartitions()
|
|
offset, _ := client.ReadLastOffset()
|
|
|
|
if len(parts) == 1 && offset == 0 {
|
|
|
|
DeleteTopic(topic)
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
CreateTopic(topic)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @Author zhangjun
|
|
* @Description 创建 Kafka Topic
|
|
* @Date 2020-07-29 02:30
|
|
* @Param
|
|
* @return
|
|
**/
|
|
func CreateTopic(topic string) {
|
|
|
|
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
|
|
|
|
err := KafkaClient.CreateTopics(kafka.TopicConfig{
|
|
NumPartitions: int(ConfigUtil.KafkaParts),
|
|
ReplicationFactor: int(ConfigUtil.KafkaReply),
|
|
Topic: topic,
|
|
})
|
|
|
|
if err != nil {
|
|
fmt.Println(err.Error())
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @Author zhangjun
|
|
* @Description 删除Kafka Topic
|
|
* @Date 2020-07-29 02:30
|
|
* @Param
|
|
* @return
|
|
**/
|
|
func DeleteTopic(topic string) {
|
|
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
|
|
|
|
err := KafkaClient.DeleteTopics(topic)
|
|
|
|
if err != nil {
|
|
fmt.Println(err.Error())
|
|
}
|
|
|
|
}
|