You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.
input {
kafka {
#Kafka地址
bootstrap_servers => "127.0.0.1:9092"
#队列名称
topics => ["tableData"]
#从第一条开始读取
auto_offset_reset => "earliest"
#消费线程, 一般是这个队列的partitions数
consumer_threads => 1
decorate_events => true
#配置id, kafak会保存消息到哪里, 重启后会接着读取
group_id => "ds-1"
codec => "json"
}
}
filter {
mutate {
#logstash会自动增加一些字段, 去除这些无用的字段
remove_field => ["event","@version","@timestamp"]
}
}
output {
elasticsearch {
hosts => "127.0.0.1:9200"
index => "%{table_name}"
document_id => "%{id}"
}
}
##启动
/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/kafka2es.conf -w 1 -b 5000
##后台启动
nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/kafka2es.conf -w 1 -b 5000 > /dev/null 2>&1 &