You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
43 lines
987 B
43 lines
987 B
package Test
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/Shopify/sarama"
|
|
"testing"
|
|
)
|
|
|
|
func TestKafkaConsumer(t *testing.T) {
|
|
fmt.Printf("consumer_test")
|
|
|
|
config := sarama.NewConfig()
|
|
config.Consumer.Return.Errors = true
|
|
config.Version = sarama.V0_11_0_2
|
|
|
|
// consumer
|
|
consumer, err := sarama.NewConsumer([]string{"10.10.6.202:9092"}, config)
|
|
if err != nil {
|
|
fmt.Printf("consumer_test create consumer error %s\n", err.Error())
|
|
return
|
|
}
|
|
|
|
defer consumer.Close()
|
|
|
|
partitionConsumer, err := consumer.ConsumePartition("base-db", 0, sarama.OffsetOldest)
|
|
if err != nil {
|
|
fmt.Printf("try create partition_consumer error %s\n", err.Error())
|
|
return
|
|
}
|
|
defer partitionConsumer.Close()
|
|
|
|
for {
|
|
select {
|
|
case msg := <-partitionConsumer.Messages():
|
|
fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
|
|
msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
|
|
case err := <-partitionConsumer.Errors():
|
|
fmt.Printf("err :%s\n", err.Error())
|
|
}
|
|
}
|
|
|
|
}
|