main
kgdxpr 2 years ago
parent fc404c81c8
commit 83ffda83fb

@ -49,7 +49,7 @@ public class KafkaProductorTest {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Producer<String, String> producer = new KafkaProducer<>(initConfig()); Producer<String, String> producer = new KafkaProducer<>(initConfig());
for (int i = 1; i <= 10; i++) { for (int i = 1; i <= 1000; i++) {
//指定发送分区 //指定发送分区
String TOPIC_NAME = "test"; String TOPIC_NAME = "test";
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME

@ -32,10 +32,10 @@ public class kafkaConsumerTest {
earliestoffsetconsumer.seekToBeginning() earliestoffsetconsumer.seekToBeginning()
*/ */
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
/* /*
consumerbrokerbrokerrebalance consumerbrokerbrokerrebalance
rebalanceconsumer rebalanceconsumer
*/ */
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/* /*
brokerconsumer brokerconsumer
@ -78,9 +78,21 @@ public class kafkaConsumerTest {
//从指定时间点开始消费 //从指定时间点开始消费
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
/*
//从1小时前开始消费 //从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; // long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
//从某个时间点获取之后的数据
String dateString = "2023-05-10 08:38:00";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date startDate = null;
try {
startDate = dateFormat.parse(dateString);
} catch (ParseException e) {
e.printStackTrace();
}
long fetchDataTime = startDate.getTime();
Map<TopicPartition, Long> map = new HashMap<>(); Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) { for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime); map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
@ -99,7 +111,7 @@ public class kafkaConsumerTest {
consumer.seek(key, offset); consumer.seek(key, offset);
} }
} }
*/
while (true) { while (true) {
/* /*
@ -107,8 +119,8 @@ public class kafkaConsumerTest {
*/ */
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) { for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), System.out.printf("收到消息partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(),
record.offset(), record.key(), record.value()); record.offset(), record.key(), record.value(), record.timestamp());
} }
if (records.count() > 0) { if (records.count() > 0) {

Loading…
Cancel
Save