From 1756bf07929b9cca825415bbf1364d9f7067d632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 14:11:16 +0800 Subject: [PATCH] 'commit' --- src/main/java/UnitTest/KafkaProductorTest.java | 2 +- src/main/java/UnitTest/kafkaConsumerTest.java | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/java/UnitTest/KafkaProductorTest.java b/src/main/java/UnitTest/KafkaProductorTest.java index 124037a..341de17 100644 --- a/src/main/java/UnitTest/KafkaProductorTest.java +++ b/src/main/java/UnitTest/KafkaProductorTest.java @@ -53,7 +53,7 @@ public class KafkaProductorTest { //指定发送分区 String TOPIC_NAME = "test"; ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME - , 0, "HuangHai_" + i, "黄海_" + i); + , 0, "HuangHai_" + i, "{\"id\":" + i + ",\"name\"=\"HuangHai_" + i + "\"}"); //等待消息发送成功的同步阻塞方法 RecordMetadata metadata = producer.send(producerRecord).get(); diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index c338786..a800c5d 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -72,16 +72,14 @@ public class kafkaConsumerTest { consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/ //指定offset消费 - /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); - consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/ + consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); + consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意:这里的包含3095的!!!!! //从指定时间点开始消费 List topicPartitions = consumer.partitionsFor(TOPIC_NAME); - - //从1小时前开始消费 -// long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; - + /* + //一般使用offset就可以满足所有需求了,不需要使用按时间点 //从某个时间点获取之后的数据 String dateString = "2023-05-10 08:38:00"; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -111,6 +109,7 @@ public class kafkaConsumerTest { consumer.seek(key, offset); } } + */ while (true) { @@ -119,6 +118,8 @@ public class kafkaConsumerTest { */ ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { + //kafka中的offset是一个64位的有符号整数 + //64位的有符号整数的最大值9223372036854775807 System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(), record.offset(), record.key(), record.value(), record.timestamp()); }