|
|
package Test
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
"github.com/Shopify/sarama"
|
|
|
"github.com/sirupsen/logrus"
|
|
|
"github.com/tracer0tong/kafkalogrus"
|
|
|
"testing"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
var KafkaAddress = "10.10.6.202:9092"
|
|
|
var KafkaTopic = "dsLogBase"
|
|
|
|
|
|
func TestLogrusKafka(t *testing.T) {
|
|
|
//writeKafka()
|
|
|
readKafka()
|
|
|
}
|
|
|
func writeKafka() {
|
|
|
var err error
|
|
|
var hook *kafkalogrus.KafkaLogrusHook
|
|
|
if hook, err = kafkalogrus.NewKafkaLogrusHook(
|
|
|
"kh",
|
|
|
logrus.AllLevels,
|
|
|
&logrus.JSONFormatter{},
|
|
|
[]string{KafkaAddress},
|
|
|
KafkaTopic,
|
|
|
true, nil); err != nil {
|
|
|
panic(err)
|
|
|
}
|
|
|
logger := logrus.New()
|
|
|
logger.Hooks.Add(hook)
|
|
|
l := logger.WithField("topic", KafkaTopic)
|
|
|
l.Debug("This must not be logged")
|
|
|
l.Info("This is an Info msg")
|
|
|
l.Warn("This is a Warn msg")
|
|
|
l.Error("This is an Error msg")
|
|
|
time.Sleep(time.Second)
|
|
|
}
|
|
|
|
|
|
func readKafka() {
|
|
|
fmt.Printf("consumer_test")
|
|
|
config := sarama.NewConfig()
|
|
|
config.Consumer.Return.Errors = true
|
|
|
config.Version = sarama.V0_11_0_2
|
|
|
// consumer
|
|
|
consumer, err := sarama.NewConsumer([]string{KafkaAddress}, config)
|
|
|
if err != nil {
|
|
|
fmt.Printf("consumer_test create consumer error %s\n", err.Error())
|
|
|
return
|
|
|
}
|
|
|
defer consumer.Close()
|
|
|
partitionConsumer, err := consumer.ConsumePartition(KafkaTopic, 0, sarama.OffsetOldest)
|
|
|
if err != nil {
|
|
|
fmt.Printf("try create partition_consumer error %s\n", err.Error())
|
|
|
return
|
|
|
}
|
|
|
defer partitionConsumer.Close()
|
|
|
for {
|
|
|
select {
|
|
|
case msg := <-partitionConsumer.Messages():
|
|
|
fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
|
|
|
msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
|
|
|
case err := <-partitionConsumer.Errors():
|
|
|
fmt.Printf("err :%s\n", err.Error())
|
|
|
}
|
|
|
}
|
|
|
//注意:需要自行记录partition和offset才能保证不重复消费数据!用一个文件来记录吧!
|
|
|
}
|