From e5b668fee1ad597b69e76998fb242a7e07e77c37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 15:21:27 +0800 Subject: [PATCH] 'commit' --- .../{Consumer.java => RocketMqConsumer.java} | 2 +- src/main/java/UnitTest/kafkaConsumerTest.java | 84 +++++++++++++------ src/main/java/UnitTest/t_sync_log.sql | 34 ++++++++ 3 files changed, 94 insertions(+), 26 deletions(-) rename src/main/java/UnitTest/{Consumer.java => RocketMqConsumer.java} (94%) create mode 100644 src/main/java/UnitTest/t_sync_log.sql diff --git a/src/main/java/UnitTest/Consumer.java b/src/main/java/UnitTest/RocketMqConsumer.java similarity index 94% rename from src/main/java/UnitTest/Consumer.java rename to src/main/java/UnitTest/RocketMqConsumer.java index b624d3d..70537d6 100644 --- a/src/main/java/UnitTest/Consumer.java +++ b/src/main/java/UnitTest/RocketMqConsumer.java @@ -8,7 +8,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; /** * @Date 2021-04-20 15:47 */ -public class Consumer { +public class RocketMqConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group"); consumer.setNamesrvAddr("10.10.14.231:9876"); diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index a800c5d..f06fba2 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -1,10 +1,21 @@ package UnitTest; +import com.alibaba.fastjson.JSONObject; +import com.aspose.slides.Collections.ArrayList; +import com.dsideal.FengHuang.Util.CommonUtil; +import com.jfinal.kit.PathKit; +import com.jfinal.kit.PropKit; +import com.jfinal.plugin.activerecord.ActiveRecordPlugin; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.Record; +import com.jfinal.plugin.druid.DruidPlugin; +import com.jfinal.plugin.redis.RedisPlugin; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; +import java.io.File; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; @@ -56,24 +67,31 @@ public class kafkaConsumerTest { } public static void main(String[] args) throws ParseException { - String TOPIC_NAME = "test"; - KafkaConsumer consumer = new KafkaConsumer<>(initConfig()); - //如果使用自动分区分配,那就不要使用手动分区分配,反之亦然,两者不可同时使用,使用一种方法,就注释掉另一种方法。 - //如使用手动分区分配,那么注释掉自动分区分配。 - //https://blog.csdn.net/qq_41413743/article/details/123636586 - // consumer.subscribe(Arrays.asList(TOPIC_NAME)); + //1、配置数据库 + PropKit.use("application.properties"); + + DruidPlugin druid = new DruidPlugin(PropKit.get("jdbcUrl"), PropKit.get("user"), + PropKit.get("password").trim(), PropKit.get("driverClassName")); + druid.start(); + // 配置ActiveRecord插件 + ActiveRecordPlugin arp = new ActiveRecordPlugin(druid); + arp.start(); + + //2、读取Kafka + String TOPIC_NAME = "maxwell"; + KafkaConsumer consumer = new KafkaConsumer<>(initConfig()); // 消费指定分区 consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //消息回溯消费 - /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); - consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/ + consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); + consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //指定offset消费 - consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); - consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意:这里的包含3095的!!!!! + //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); + //consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意:这里的包含3095的!!!!! //从指定时间点开始消费 List topicPartitions = consumer.partitionsFor(TOPIC_NAME); @@ -111,34 +129,50 @@ public class kafkaConsumerTest { } */ - while (true) { - /* - * poll() API 是拉取消息的长轮询 - */ - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + List writeList = new ArrayList(); + List delList = new ArrayList(); + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { //kafka中的offset是一个64位的有符号整数 //64位的有符号整数的最大值9223372036854775807 - System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(), - record.offset(), record.key(), record.value(), record.timestamp()); + Record r = new Record(); + r.set("offset", record.offset()); + r.set("timestamp", record.timestamp()); + JSONObject jo = JSONObject.parseObject(record.value()); + r.set("database", jo.get("database")); + r.set("table", jo.get("table")); + r.set("type", jo.get("type")); + writeList.add(r); + delList.add(record.offset()); } if (records.count() > 0) { // 手动同步提交offset,当前线程会阻塞直到offset提交成功 // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了 - consumer.commitSync(); + //consumer.commitSync(); + System.out.println("将插入数据条数:"+writeList.size()); // 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑 - /*consumer.commitAsync(new OffsetCommitCallback() { - @Override - public void onComplete(Map offsets, Exception exception) { - if (exception != null) { - System.err.println("Commit failed for " + offsets); - System.err.println("Commit failed exception: " + exception.getStackTrace()); + consumer.commitAsync((offsets, exception) -> { + if (exception != null) { + System.err.println("Commit failed for " + offsets); + System.err.println("Commit failed exception: " + exception.getStackTrace()); + } else { + if (writeList.size() > 0) { + String str = ""; + for (Long aLong : delList) { + str = str + "," + aLong; + } + if (str.length() > 0) { + str = str.substring(1); + String sql = "delete from t_sync_log where offset in (" + str + ")"; + Db.update(sql); + } + Db.batchSave("t_sync_log", writeList, 100); } } - });*/ + }); } } } diff --git a/src/main/java/UnitTest/t_sync_log.sql b/src/main/java/UnitTest/t_sync_log.sql new file mode 100644 index 0000000..5b3138d --- /dev/null +++ b/src/main/java/UnitTest/t_sync_log.sql @@ -0,0 +1,34 @@ +/* + Navicat Premium Data Transfer + + Source Server : 10.10.14.169 + Source Server Type : MySQL + Source Server Version : 100505 (10.5.5-MariaDB-log) + Source Host : 10.10.14.169:22066 + Source Schema : ccdjzswd_db + + Target Server Type : MySQL + Target Server Version : 100505 (10.5.5-MariaDB-log) + File Encoding : 65001 + + Date: 10/05/2023 15:21:01 +*/ + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for t_sync_log +-- ---------------------------- +DROP TABLE IF EXISTS `t_sync_log`; +CREATE TABLE `t_sync_log` ( + `offset` bigint NOT NULL, + `timestamp` bigint NOT NULL, + `database` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `table` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + PRIMARY KEY (`offset`) USING BTREE, + INDEX `database`(`database` ASC, `table` ASC, `type` ASC) USING BTREE +) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; + +SET FOREIGN_KEY_CHECKS = 1;