main
黄海 2 years ago
parent 83ffda83fb
commit 1756bf0792

@ -53,7 +53,7 @@ public class KafkaProductorTest {
//指定发送分区
String TOPIC_NAME = "test";
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME
, 0, "HuangHai_" + i, "黄海_" + i);
, 0, "HuangHai_" + i, "{\"id\":" + i + ",\"name\"=\"HuangHai_" + i + "\"}");
//等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();

@ -72,16 +72,14 @@ public class kafkaConsumerTest {
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/
//指定offset消费
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意这里的包含3095的
//从指定时间点开始消费
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
// long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
/*
//一般使用offset就可以满足所有需求了不需要使用按时间点
//从某个时间点获取之后的数据
String dateString = "2023-05-10 08:38:00";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ -111,6 +109,7 @@ public class kafkaConsumerTest {
consumer.seek(key, offset);
}
}
*/
while (true) {
@ -119,6 +118,8 @@ public class kafkaConsumerTest {
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//kafka中的offset是一个64位的有符号整数
//64位的有符号整数的最大值9223372036854775807
System.out.printf("收到消息partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(),
record.offset(), record.key(), record.value(), record.timestamp());
}

Loading…
Cancel
Save