You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dsMin/dsTools/Test/kafka_consumer_test.go

43 lines
987 B

package Test
import (
"fmt"
"github.com/Shopify/sarama"
"testing"
)
func TestKafkaConsumer(t *testing.T) {
fmt.Printf("consumer_test")
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V0_11_0_2
// consumer
consumer, err := sarama.NewConsumer([]string{"10.10.6.202:9092"}, config)
if err != nil {
fmt.Printf("consumer_test create consumer error %s\n", err.Error())
return
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition("base-db", 0, sarama.OffsetOldest)
if err != nil {
fmt.Printf("try create partition_consumer error %s\n", err.Error())
return
}
defer partitionConsumer.Close()
for {
select {
case msg := <-partitionConsumer.Messages():
fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
case err := <-partitionConsumer.Errors():
fmt.Printf("err :%s\n", err.Error())
}
}
}