From 6cf26ce326c59fc5d758f36f907724ae18728e30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Tue, 9 May 2023 16:51:36 +0800 Subject: [PATCH] 'commit' --- pom.xml | 13 ++++ src/main/java/UnitTest/KafkaTest.java | 85 +++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 src/main/java/UnitTest/KafkaTest.java diff --git a/pom.xml b/pom.xml index 2d9257d..628ba1e 100644 --- a/pom.xml +++ b/pom.xml @@ -278,6 +278,19 @@ rocketmq-client 4.9.4 + + + + org.apache.kafka + kafka-clients + 3.4.0 + + + + org.apache.kafka + kafka-streams + 3.4.0 + diff --git a/src/main/java/UnitTest/KafkaTest.java b/src/main/java/UnitTest/KafkaTest.java new file mode 100644 index 0000000..de5da1f --- /dev/null +++ b/src/main/java/UnitTest/KafkaTest.java @@ -0,0 +1,85 @@ +package UnitTest; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Collections; +import java.util.Date; +import java.util.Map; +import java.util.Properties; + +public class KafkaTest { + + /** + * 功能:获取kafka的时间戳 + * + * @param dateString + * @return + * @throws ParseException + */ + public static long getKafkaTimeStamp(String dateString) throws ParseException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = sdf.parse(dateString); + long millis = date.getTime(); + long kafkaTimestamp = millis / 1000; + return kafkaTimestamp; + } + + public static void main(String[] args) throws ParseException { + + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092"); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "dsideal-group"); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "dsideal1"); // 客户端 ID,用于标识消费者 + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + // 配置消费者客户端时间戳类型为server_time(服务端时间) + //props.setProperty(ConsumerConfig.TIMESTAMP_TYPE_CONFIG, "server_time"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + + /* + // 订阅需要消费的Topic + consumer.subscribe(Collections.singletonList("test")); + // 消费消息 + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + String key = record.key(); + String value = record.value(); + long offset = record.offset(); + int partition = record.partition(); + System.out.printf("key=%s,value=%s,offset=%d,partition=%d\n", key, value, offset, partition); + // 处理收到的消息 + } + }*/ + + // 关闭Consumer对象 + //consumer.close(); + + Duration pollTimeout = Duration.ofMillis(100); + Map startOffsets = Collections.singletonMap(new TopicPartition("test", 0), 1651363200000L); + consumer.assign(startOffsets.keySet()); + Map offsetAndTimestamps = consumer.offsetsForTimes(startOffsets); + for (Map.Entry entry : offsetAndTimestamps.entrySet()) { + TopicPartition tp = entry.getKey(); + OffsetAndTimestamp offsetAndTimestamp = entry.getValue(); + if (offsetAndTimestamp != null) { + consumer.seek(tp, offsetAndTimestamp.offset()); + } + } + + while (true) { + ConsumerRecords records = consumer.poll(pollTimeout); + for (ConsumerRecord record : records) { + System.out.printf("topic=%s, partition=%s, offset=%d, timestamp=%s, key=%s, value=%s%n", + record.topic(), record.partition(), record.offset(), record.timestamp(), record.key(), record.value()); + } + } + } +}