package KafkaUtil import ( "context" "dsBaseWeb/Utils/ConfigUtil" "fmt" "github.com/segmentio/kafka-go" "strconv" "time" ) var KafkaBroker string func init() { KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0) brokers, _ := KafkaClient.Brokers() KafkaBroker = brokers[0].Host + ":" + strconv.Itoa(brokers[0].Port) + "【" + strconv.Itoa(brokers[0].ID) + "】" } /** * @Author zhangjun * @Description * @Date 2020-08-03 08:47 * @Param * @return **/ func InitTopic(topic string) { client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0) parts, _ := client.ReadPartitions() offset, _ := client.ReadLastOffset() if len(parts) == 1 && offset == 0 { DeleteTopic(topic) time.Sleep(100 * time.Millisecond) CreateTopic(topic) } } /** * @Author zhangjun * @Description 创建 Kafka Topic * @Date 2020-07-29 02:30 * @Param * @return **/ func CreateTopic(topic string) { KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0) err := KafkaClient.CreateTopics(kafka.TopicConfig{ NumPartitions: int(ConfigUtil.KafkaParts), ReplicationFactor: int(ConfigUtil.KafkaReply), Topic: topic, }) if err != nil { fmt.Println(err.Error()) } } /** * @Author zhangjun * @Description 删除Kafka Topic * @Date 2020-07-29 02:30 * @Param * @return **/ func DeleteTopic(topic string) { KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0) err := KafkaClient.DeleteTopics(topic) if err != nil { fmt.Println(err.Error()) } }