From 48b63fd6eb9f58c2c444499f0266c0f785a650a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Tue, 9 May 2023 18:25:48 +0800 Subject: [PATCH] 'commit' --- src/main/java/UnitTest/KafkaTest.java | 85 ------------------- src/main/java/UnitTest/kafkaConsumerTest.java | 65 ++++++++++++++ 2 files changed, 65 insertions(+), 85 deletions(-) delete mode 100644 src/main/java/UnitTest/KafkaTest.java create mode 100644 src/main/java/UnitTest/kafkaConsumerTest.java diff --git a/src/main/java/UnitTest/KafkaTest.java b/src/main/java/UnitTest/KafkaTest.java deleted file mode 100644 index de5da1f..0000000 --- a/src/main/java/UnitTest/KafkaTest.java +++ /dev/null @@ -1,85 +0,0 @@ -package UnitTest; - -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.time.Duration; -import java.util.Collections; -import java.util.Date; -import java.util.Map; -import java.util.Properties; - -public class KafkaTest { - - /** - * 功能:获取kafka的时间戳 - * - * @param dateString - * @return - * @throws ParseException - */ - public static long getKafkaTimeStamp(String dateString) throws ParseException { - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date date = sdf.parse(dateString); - long millis = date.getTime(); - long kafkaTimestamp = millis / 1000; - return kafkaTimestamp; - } - - public static void main(String[] args) throws ParseException { - - Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092"); - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "dsideal-group"); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "dsideal1"); // 客户端 ID,用于标识消费者 - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - - // 配置消费者客户端时间戳类型为server_time(服务端时间) - //props.setProperty(ConsumerConfig.TIMESTAMP_TYPE_CONFIG, "server_time"); - - KafkaConsumer consumer = new KafkaConsumer<>(props); - - /* - // 订阅需要消费的Topic - consumer.subscribe(Collections.singletonList("test")); - // 消费消息 - while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - String key = record.key(); - String value = record.value(); - long offset = record.offset(); - int partition = record.partition(); - System.out.printf("key=%s,value=%s,offset=%d,partition=%d\n", key, value, offset, partition); - // 处理收到的消息 - } - }*/ - - // 关闭Consumer对象 - //consumer.close(); - - Duration pollTimeout = Duration.ofMillis(100); - Map startOffsets = Collections.singletonMap(new TopicPartition("test", 0), 1651363200000L); - consumer.assign(startOffsets.keySet()); - Map offsetAndTimestamps = consumer.offsetsForTimes(startOffsets); - for (Map.Entry entry : offsetAndTimestamps.entrySet()) { - TopicPartition tp = entry.getKey(); - OffsetAndTimestamp offsetAndTimestamp = entry.getValue(); - if (offsetAndTimestamp != null) { - consumer.seek(tp, offsetAndTimestamp.offset()); - } - } - - while (true) { - ConsumerRecords records = consumer.poll(pollTimeout); - for (ConsumerRecord record : records) { - System.out.printf("topic=%s, partition=%s, offset=%d, timestamp=%s, key=%s, value=%s%n", - record.topic(), record.partition(), record.offset(), record.timestamp(), record.key(), record.value()); - } - } - } -} diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java new file mode 100644 index 0000000..a97cb0f --- /dev/null +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -0,0 +1,65 @@ +package UnitTest; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; + +public class kafkaConsumerTest { + + /** + * 功能:获取kafka的时间戳 + * + * @param dateString + * @return + * @throws ParseException + */ + public static long getKafkaTimeStamp(String dateString) throws ParseException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = sdf.parse(dateString); + long millis = date.getTime(); + long kafkaTimestamp = millis / 1000; + return kafkaTimestamp; + } + + protected static Properties initConfig(){ + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + properties.put(ConsumerConfig.GROUP_ID_CONFIG,"dsideal-group"); + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "dsideal1"); + return properties; + } + + public static void main(String[] args) throws ParseException { + KafkaConsumer kafkaConsumer = new KafkaConsumer(initConfig()); + String topic="test"; + kafkaConsumer.subscribe(Arrays.asList(topic)); + //kafka的分区逻辑是在poll方法里执行的,所以执行seek方法之前先执行一次poll方法 + //获取当前消费者消费分区的情况 + Set assignment = new HashSet<>(); + while (assignment.size() == 0) { + //如果没有分配到分区,就一直循环下去 + kafkaConsumer.poll(100L); + assignment = kafkaConsumer.assignment(); + } + for (TopicPartition tp : assignment) { + //消费第当前分区的offset为10的消息 + kafkaConsumer.seek(tp, 10); + } + while (true) { + ConsumerRecords consumerRecords = kafkaConsumer.poll(2000L); + System.out.println("本次拉取的消息数量:" + consumerRecords.count()); + System.out.println("消息集合是否为空:" + consumerRecords.isEmpty()); + for (ConsumerRecord consumerRecord : consumerRecords) { + System.out.println("消费到的消息key:" + consumerRecord.key() + ",value:" + consumerRecord.value() + ",offset:" + consumerRecord.offset()); + } + } + } +} +//https://blog.csdn.net/qq_39839075/article/details/105522855