diff --git a/src/main/java/UnitTest/KafkaProductorTest.java b/src/main/java/UnitTest/KafkaProductorTest.java index 9657f54..124037a 100644 --- a/src/main/java/UnitTest/KafkaProductorTest.java +++ b/src/main/java/UnitTest/KafkaProductorTest.java @@ -49,7 +49,7 @@ public class KafkaProductorTest { public static void main(String[] args) throws Exception { Producer producer = new KafkaProducer<>(initConfig()); - for (int i = 1; i <= 10; i++) { + for (int i = 1; i <= 1000; i++) { //指定发送分区 String TOPIC_NAME = "test"; ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index be44251..c338786 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -32,10 +32,10 @@ public class kafkaConsumerTest { earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费) */ //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - /* - consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将 - rebalance方案下发给consumer,这个时间可以稍微短一点 - */ + /* + consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将 + rebalance方案下发给consumer,这个时间可以稍微短一点 + */ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); /* 服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组, @@ -78,9 +78,21 @@ public class kafkaConsumerTest { //从指定时间点开始消费 List topicPartitions = consumer.partitionsFor(TOPIC_NAME); - /* + //从1小时前开始消费 - long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; +// long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; + + //从某个时间点获取之后的数据 + String dateString = "2023-05-10 08:38:00"; + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date startDate = null; + try { + startDate = dateFormat.parse(dateString); + } catch (ParseException e) { + e.printStackTrace(); + } + + long fetchDataTime = startDate.getTime(); Map map = new HashMap<>(); for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime); @@ -99,7 +111,7 @@ public class kafkaConsumerTest { consumer.seek(key, offset); } } - */ + while (true) { /* @@ -107,8 +119,8 @@ public class kafkaConsumerTest { */ ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { - System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), - record.offset(), record.key(), record.value()); + System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(), + record.offset(), record.key(), record.value(), record.timestamp()); } if (records.count() > 0) {