main
黄海 2 years ago
parent c54d47dea8
commit e5b668fee1

@ -8,7 +8,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
/** /**
* @Date 2021-04-20 15:47 * @Date 2021-04-20 15:47
*/ */
public class Consumer { public class RocketMqConsumer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group");
consumer.setNamesrvAddr("10.10.14.231:9876"); consumer.setNamesrvAddr("10.10.14.231:9876");

@ -1,10 +1,21 @@
package UnitTest; package UnitTest;
import com.alibaba.fastjson.JSONObject;
import com.aspose.slides.Collections.ArrayList;
import com.dsideal.FengHuang.Util.CommonUtil;
import com.jfinal.kit.PathKit;
import com.jfinal.kit.PropKit;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.druid.DruidPlugin;
import com.jfinal.plugin.redis.RedisPlugin;
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.File;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.Duration; import java.time.Duration;
@ -56,24 +67,31 @@ public class kafkaConsumerTest {
} }
public static void main(String[] args) throws ParseException { public static void main(String[] args) throws ParseException {
String TOPIC_NAME = "test";
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(initConfig());
//如果使用自动分区分配,那就不要使用手动分区分配,反之亦然,两者不可同时使用,使用一种方法,就注释掉另一种方法。 //1、配置数据库
//如使用手动分区分配,那么注释掉自动分区分配。 PropKit.use("application.properties");
//https://blog.csdn.net/qq_41413743/article/details/123636586
// consumer.subscribe(Arrays.asList(TOPIC_NAME)); DruidPlugin druid = new DruidPlugin(PropKit.get("jdbcUrl"), PropKit.get("user"),
PropKit.get("password").trim(), PropKit.get("driverClassName"));
druid.start();
// 配置ActiveRecord插件
ActiveRecordPlugin arp = new ActiveRecordPlugin(druid);
arp.start();
//2、读取Kafka
String TOPIC_NAME = "maxwell";
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(initConfig());
// 消费指定分区 // 消费指定分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//消息回溯消费 //消息回溯消费
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/ consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//指定offset消费 //指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意这里的包含3095的 //consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意这里的包含3095的
//从指定时间点开始消费 //从指定时间点开始消费
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
@ -111,34 +129,50 @@ public class kafkaConsumerTest {
} }
*/ */
while (true) { while (true) {
/* List<Record> writeList = new ArrayList();
* poll() API List<Long> delList = new ArrayList();
*/ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) { for (ConsumerRecord<String, String> record : records) {
//kafka中的offset是一个64位的有符号整数 //kafka中的offset是一个64位的有符号整数
//64位的有符号整数的最大值9223372036854775807 //64位的有符号整数的最大值9223372036854775807
System.out.printf("收到消息partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(), Record r = new Record();
record.offset(), record.key(), record.value(), record.timestamp()); r.set("offset", record.offset());
r.set("timestamp", record.timestamp());
JSONObject jo = JSONObject.parseObject(record.value());
r.set("database", jo.get("database"));
r.set("table", jo.get("table"));
r.set("type", jo.get("type"));
writeList.add(r);
delList.add(record.offset());
} }
if (records.count() > 0) { if (records.count() > 0) {
// 手动同步提交offset当前线程会阻塞直到offset提交成功 // 手动同步提交offset当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了 // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync(); //consumer.commitSync();
System.out.println("将插入数据条数:"+writeList.size());
// 手动异步提交offset当前线程提交offset不会阻塞可以继续处理后面的程序逻辑 // 手动异步提交offset当前线程提交offset不会阻塞可以继续处理后面的程序逻辑
/*consumer.commitAsync(new OffsetCommitCallback() { consumer.commitAsync((offsets, exception) -> {
@Override if (exception != null) {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { System.err.println("Commit failed for " + offsets);
if (exception != null) { System.err.println("Commit failed exception: " + exception.getStackTrace());
System.err.println("Commit failed for " + offsets); } else {
System.err.println("Commit failed exception: " + exception.getStackTrace()); if (writeList.size() > 0) {
String str = "";
for (Long aLong : delList) {
str = str + "," + aLong;
}
if (str.length() > 0) {
str = str.substring(1);
String sql = "delete from t_sync_log where offset in (" + str + ")";
Db.update(sql);
}
Db.batchSave("t_sync_log", writeList, 100);
} }
} }
});*/ });
} }
} }
} }

@ -0,0 +1,34 @@
/*
Navicat Premium Data Transfer
Source Server : 10.10.14.169
Source Server Type : MySQL
Source Server Version : 100505 (10.5.5-MariaDB-log)
Source Host : 10.10.14.169:22066
Source Schema : ccdjzswd_db
Target Server Type : MySQL
Target Server Version : 100505 (10.5.5-MariaDB-log)
File Encoding : 65001
Date: 10/05/2023 15:21:01
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for t_sync_log
-- ----------------------------
DROP TABLE IF EXISTS `t_sync_log`;
CREATE TABLE `t_sync_log` (
`offset` bigint NOT NULL,
`timestamp` bigint NOT NULL,
`database` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`table` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
PRIMARY KEY (`offset`) USING BTREE,
INDEX `database`(`database` ASC, `table` ASC, `type` ASC) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
Loading…
Cancel
Save