package KafkaUtil import ( "bytes" "context" "fmt" "github.com/segmentio/kafka-go" "time" ) var KafkaClient *kafka.Conn var CTX context.Context func init() { CTX=context.Background() } func ProvideLow(topic string) { KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", "192.168.0.200:9092", topic , 0) KafkaClient.SetWriteDeadline(time.Now().Add(5 *time.Second)) KafkaClient.WriteMessages( kafka.Message{Value: []byte("one!")}, kafka.Message{Value: []byte("two!")}, kafka.Message{Value: []byte("three!")}, ) //KafkaClient.Close() } func ConsumeLow() { KafkaClient.SetReadDeadline(time.Now().Add(5 *time.Second)) batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max for { b := make([]byte, 10e3) // 10KB max per message _, err := batch.Read(b) if err != nil { break } index := bytes.IndexByte(b, 0) fmt.Println(string(b[0:index])) } batch.Close() //KafkaClient.Close() } func Consume() { r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"192.168.0.200:9092"}, Topic: "log_login", //Partition: 0, GroupID: "Group_04",//必须指定 Group,否则需要指定 Partition!!! MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) for { m, err := r.ReadMessage( CTX ) if err != nil { break } fmt.Printf("message at partiton %d offset %d: %s ==> %s\n",m.Partition, m.Offset, string(m.Key), string(m.Value)) } r.Close() } func Provide(){ w := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"192.168.0.200:9092"}, Topic: "log_login", Balancer: &kafka.LeastBytes{},//Partition 自动分配器 }) w.WriteMessages(context.Background(), kafka.Message{ //Key: []byte("Key-A"), Value: []byte("Hello World! One"), }, kafka.Message{ Value: []byte("Hello World! Two"), }, kafka.Message{ Value: []byte("Hello World! Three"), }, ) w.Close() } func CreateTopic(topic string){ KafkaClient, _ = kafka.DialLeader(CTX , "tcp", "10.10.14.238:9092", "__consumer_offsets" , 0) err:= KafkaClient.CreateTopics(kafka.TopicConfig{ NumPartitions: 8, ReplicationFactor: 1, Topic: topic, }) if err != nil{ fmt.Println(err.Error()) } } func DeleteTopic(topic string){ KafkaClient, _ = kafka.DialLeader( CTX, "tcp", "10.10.14.238:9092", topic , 0) err:= KafkaClient.DeleteTopics(topic) if err != nil{ fmt.Println(err.Error()) } }