input { kafka { #Kafka地址 bootstrap_servers => "127.0.0.1:9092" #队列名称 topics => ["tableData"] #从第一条开始读取 auto_offset_reset => "earliest" #消费线程,一般是这个队列的partitions数 consumer_threads => 1 decorate_events => true #配置id,kafak会保存消息到哪里,重启后会接着读取 group_id => "ds-1" codec => "json" } } filter { mutate { #logstash会自动增加一些字段,去除这些无用的字段 remove_field => ["event","@version","@timestamp"] } } output { elasticsearch { hosts => "127.0.0.1:9200" index => "%{table_name}" document_id => "%{id}" } } ##启动 /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/kafka2es.conf -w 1 -b 5000 ##后台启动 nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/kafka2es.conf -w 1 -b 5000 > /dev/null 2>&1 &