You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

66 lines
2.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package UnitTest;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class kafkaConsumerTest {
/**
* 功能获取kafka的时间戳
*
* @param dateString
* @return
* @throws ParseException
*/
public static long getKafkaTimeStamp(String dateString) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = sdf.parse(dateString);
long millis = date.getTime();
long kafkaTimestamp = millis / 1000;
return kafkaTimestamp;
}
protected static Properties initConfig(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"dsideal-group");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "dsideal1");
return properties;
}
public static void main(String[] args) throws ParseException {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(initConfig());
String topic="test";
kafkaConsumer.subscribe(Arrays.asList(topic));
//kafka的分区逻辑是在poll方法里执行的,所以执行seek方法之前先执行一次poll方法
//获取当前消费者消费分区的情况
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
//如果没有分配到分区,就一直循环下去
kafkaConsumer.poll(100L);
assignment = kafkaConsumer.assignment();
}
for (TopicPartition tp : assignment) {
//消费第当前分区的offset为10的消息
kafkaConsumer.seek(tp, 10);
}
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(2000L);
System.out.println("本次拉取的消息数量:" + consumerRecords.count());
System.out.println("消息集合是否为空:" + consumerRecords.isEmpty());
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("消费到的消息key:" + consumerRecord.key() + ",value:" + consumerRecord.value() + ",offset:" + consumerRecord.offset());
}
}
}
}
//https://blog.csdn.net/qq_39839075/article/details/105522855