package KafkaUtil2 import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) /** * @Author zhangjun * @Description * @Date 2020-07-22 09:58 * @Param TODO: NOT IN USE,需要 gcc 编译 librdkafka,增加了部署复杂度!!!暂不使用!!! * @return **/ func Consume() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "myGroup", "auto.offset.reset": "earliest", }) if err != nil { panic(err) } //TODO: 支持正则表达式,模糊匹配 Topic!!!牛!!! c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { // The client will automatically try to recover from all errors. fmt.Printf("Consumer error: %v (%v)\n", err, msg) } } c.Close() } func Provide() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) if err != nil { panic(err) } defer p.Close() // Delivery report handler for produced messages go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) topic := "myTopic" for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } // Wait for message deliveries before shutting down p.Flush(15 * 1000) }