main
黄海 2 years ago
parent 48b63fd6eb
commit 3b761b78a6

@ -279,18 +279,12 @@
<version>4.9.4</version>
</dependency>
<!--引入kafak-->
<!--引入kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<build>
<plugins>

@ -0,0 +1,62 @@
package UnitTest;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProductorTest {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092");
/*
1acks=0 producerbroker
2acks=1 leaderlogfollower
followerleader
3acks=-1all min.insync.replicas(12)
使
*/
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
100ms
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
//设置发送消息的本地缓冲区如果设置了该缓冲区消息会先发送到本地缓冲区可以提高消息发送性能默认值是33554432即32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
/*
kafka线broker
1638416kbbatch16kb
*/
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
/*
0
10batch10batch16kbbatch
10batch
*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(props);
int msgNum = 5;
for (int i = 1; i <= msgNum; i++) {
//指定发送分区
String TOPIC_NAME="test";
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME
, 0, "bbb", "aaa");
//等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset()+"|order-"+i);
}
producer.close();
}
}

@ -8,7 +8,7 @@ import java.util.UUID;
/**
* @Date 2021-04-20 15:40
*/
public class Producer {
public class RocketMqProducer {
public static void main(String[] args) throws Exception {
//声明发送者

@ -6,6 +6,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
public class kafkaConsumerTest {
@ -25,39 +26,123 @@ public class kafkaConsumerTest {
return kafkaTimestamp;
}
protected static Properties initConfig(){
protected static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"dsideal-group");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dsideal-group");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "dsideal1");
return properties;
}
public static void main(String[] args) throws ParseException {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(initConfig());
String topic="test";
kafkaConsumer.subscribe(Arrays.asList(topic));
//kafka的分区逻辑是在poll方法里执行的,所以执行seek方法之前先执行一次poll方法
//获取当前消费者消费分区的情况
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
//如果没有分配到分区,就一直循环下去
kafkaConsumer.poll(100L);
assignment = kafkaConsumer.assignment();
}
for (TopicPartition tp : assignment) {
//消费第当前分区的offset为10的消息
kafkaConsumer.seek(tp, 10);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.14.67:9092");
String CONSUMER_GROUP_NAME="dsideal_group";
String TOPIC_NAME="test";
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 是否自动提交offset默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/*
offsetoffset
latest()
earliestoffsetconsumer.seekToBeginning()
*/
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
/*
consumerbrokerbrokerrebalance
rebalanceconsumer
*/
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/*
brokerconsumer
Partitionconsumer10
*/
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
//一次poll最大拉取消息的条数如果消费者处理速度很快可以设置大点如果处理速度一般可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
/*
pollbrokerconsumer
consumer
*/
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 消费指定分区
//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//消息回溯消费
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/
//指定offset消费
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/
//从指定时间点开始消费
/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
System.out.println();
//根据消费里的timestamp确定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}*/
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(2000L);
System.out.println("本次拉取的消息数量:" + consumerRecords.count());
System.out.println("消息集合是否为空:" + consumerRecords.isEmpty());
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("消费到的消息key:" + consumerRecord.key() + ",value:" + consumerRecord.value() + ",offset:" + consumerRecord.offset());
/*
* poll() API
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
if (records.count() > 0) {
// 手动同步提交offset当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync();
// 手动异步提交offset当前线程提交offset不会阻塞可以继续处理后面的程序逻辑
/*consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
}
});*/
}
}
}

Loading…
Cancel
Save