package Test import ( "fmt" "github.com/Shopify/sarama" "github.com/sirupsen/logrus" "github.com/tracer0tong/kafkalogrus" "testing" "time" ) var KafkaAddress = "10.10.6.202:9092" var KafkaTopic = "dsLogBase" func TestLogrusKafka(t *testing.T) { //writeKafka() readKafka() } func writeKafka() { var err error var hook *kafkalogrus.KafkaLogrusHook if hook, err = kafkalogrus.NewKafkaLogrusHook( "kh", logrus.AllLevels, &logrus.JSONFormatter{}, []string{KafkaAddress}, KafkaTopic, true, nil); err != nil { panic(err) } logger := logrus.New() logger.Hooks.Add(hook) l := logger.WithField("topic", KafkaTopic) l.Debug("This must not be logged") l.Info("This is an Info msg") l.Warn("This is a Warn msg") l.Error("This is an Error msg") time.Sleep(time.Second) } func readKafka() { fmt.Printf("consumer_test") config := sarama.NewConfig() config.Consumer.Return.Errors = true config.Version = sarama.V0_11_0_2 // consumer consumer, err := sarama.NewConsumer([]string{KafkaAddress}, config) if err != nil { fmt.Printf("consumer_test create consumer error %s\n", err.Error()) return } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition(KafkaTopic, 0, sarama.OffsetOldest) if err != nil { fmt.Printf("try create partition_consumer error %s\n", err.Error()) return } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n", msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value)) case err := <-partitionConsumer.Errors(): fmt.Printf("err :%s\n", err.Error()) } } //注意:需要自行记录partition和offset才能保证不重复消费数据!用一个文件来记录吧! }