From 1756bf07929b9cca825415bbf1364d9f7067d632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 14:11:16 +0800 Subject: [PATCH 01/11] 'commit' --- src/main/java/UnitTest/KafkaProductorTest.java | 2 +- src/main/java/UnitTest/kafkaConsumerTest.java | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/java/UnitTest/KafkaProductorTest.java b/src/main/java/UnitTest/KafkaProductorTest.java index 124037a..341de17 100644 --- a/src/main/java/UnitTest/KafkaProductorTest.java +++ b/src/main/java/UnitTest/KafkaProductorTest.java @@ -53,7 +53,7 @@ public class KafkaProductorTest { //指定发送分区 String TOPIC_NAME = "test"; ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME - , 0, "HuangHai_" + i, "黄海_" + i); + , 0, "HuangHai_" + i, "{\"id\":" + i + ",\"name\"=\"HuangHai_" + i + "\"}"); //等待消息发送成功的同步阻塞方法 RecordMetadata metadata = producer.send(producerRecord).get(); diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index c338786..a800c5d 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -72,16 +72,14 @@ public class kafkaConsumerTest { consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/ //指定offset消费 - /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); - consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/ + consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); + consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意:这里的包含3095的!!!!! //从指定时间点开始消费 List topicPartitions = consumer.partitionsFor(TOPIC_NAME); - - //从1小时前开始消费 -// long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; - + /* + //一般使用offset就可以满足所有需求了,不需要使用按时间点 //从某个时间点获取之后的数据 String dateString = "2023-05-10 08:38:00"; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -111,6 +109,7 @@ public class kafkaConsumerTest { consumer.seek(key, offset); } } + */ while (true) { @@ -119,6 +118,8 @@ public class kafkaConsumerTest { */ ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { + //kafka中的offset是一个64位的有符号整数 + //64位的有符号整数的最大值9223372036854775807 System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(), record.offset(), record.key(), record.value(), record.timestamp()); } From c54d47dea823430321958f635c2e0885a871580e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 14:46:05 +0800 Subject: [PATCH 02/11] 'commit' --- src/main/java/UnitTest/KafkaProductorTest.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/java/UnitTest/KafkaProductorTest.java b/src/main/java/UnitTest/KafkaProductorTest.java index 341de17..3c70959 100644 --- a/src/main/java/UnitTest/KafkaProductorTest.java +++ b/src/main/java/UnitTest/KafkaProductorTest.java @@ -56,9 +56,19 @@ public class KafkaProductorTest { , 0, "HuangHai_" + i, "{\"id\":" + i + ",\"name\"=\"HuangHai_" + i + "\"}"); //等待消息发送成功的同步阻塞方法 - RecordMetadata metadata = producer.send(producerRecord).get(); - System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" - + metadata.partition() + "|offset-" + metadata.offset() + "|" + i); + //RecordMetadata metadata = producer.send(producerRecord).get(); + //System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + //+ metadata.partition() + "|offset-" + metadata.offset() + "|" + i); + + // 异步发送消息 + producer.send(producerRecord, (metadata, exception) -> { + if (exception != null) { + System.err.println("异步发送消息失败:" + exception.getMessage()); + } else { + System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + + metadata.partition() + "|offset-" + metadata.offset()); + } + }); } producer.close(); } From e5b668fee1ad597b69e76998fb242a7e07e77c37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 15:21:27 +0800 Subject: [PATCH 03/11] 'commit' --- .../{Consumer.java => RocketMqConsumer.java} | 2 +- src/main/java/UnitTest/kafkaConsumerTest.java | 84 +++++++++++++------ src/main/java/UnitTest/t_sync_log.sql | 34 ++++++++ 3 files changed, 94 insertions(+), 26 deletions(-) rename src/main/java/UnitTest/{Consumer.java => RocketMqConsumer.java} (94%) create mode 100644 src/main/java/UnitTest/t_sync_log.sql diff --git a/src/main/java/UnitTest/Consumer.java b/src/main/java/UnitTest/RocketMqConsumer.java similarity index 94% rename from src/main/java/UnitTest/Consumer.java rename to src/main/java/UnitTest/RocketMqConsumer.java index b624d3d..70537d6 100644 --- a/src/main/java/UnitTest/Consumer.java +++ b/src/main/java/UnitTest/RocketMqConsumer.java @@ -8,7 +8,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; /** * @Date 2021-04-20 15:47 */ -public class Consumer { +public class RocketMqConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group"); consumer.setNamesrvAddr("10.10.14.231:9876"); diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index a800c5d..f06fba2 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -1,10 +1,21 @@ package UnitTest; +import com.alibaba.fastjson.JSONObject; +import com.aspose.slides.Collections.ArrayList; +import com.dsideal.FengHuang.Util.CommonUtil; +import com.jfinal.kit.PathKit; +import com.jfinal.kit.PropKit; +import com.jfinal.plugin.activerecord.ActiveRecordPlugin; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.Record; +import com.jfinal.plugin.druid.DruidPlugin; +import com.jfinal.plugin.redis.RedisPlugin; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; +import java.io.File; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; @@ -56,24 +67,31 @@ public class kafkaConsumerTest { } public static void main(String[] args) throws ParseException { - String TOPIC_NAME = "test"; - KafkaConsumer consumer = new KafkaConsumer<>(initConfig()); - //如果使用自动分区分配,那就不要使用手动分区分配,反之亦然,两者不可同时使用,使用一种方法,就注释掉另一种方法。 - //如使用手动分区分配,那么注释掉自动分区分配。 - //https://blog.csdn.net/qq_41413743/article/details/123636586 - // consumer.subscribe(Arrays.asList(TOPIC_NAME)); + //1、配置数据库 + PropKit.use("application.properties"); + + DruidPlugin druid = new DruidPlugin(PropKit.get("jdbcUrl"), PropKit.get("user"), + PropKit.get("password").trim(), PropKit.get("driverClassName")); + druid.start(); + // 配置ActiveRecord插件 + ActiveRecordPlugin arp = new ActiveRecordPlugin(druid); + arp.start(); + + //2、读取Kafka + String TOPIC_NAME = "maxwell"; + KafkaConsumer consumer = new KafkaConsumer<>(initConfig()); // 消费指定分区 consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //消息回溯消费 - /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); - consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/ + consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); + consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //指定offset消费 - consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); - consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意:这里的包含3095的!!!!! + //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); + //consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意:这里的包含3095的!!!!! //从指定时间点开始消费 List topicPartitions = consumer.partitionsFor(TOPIC_NAME); @@ -111,34 +129,50 @@ public class kafkaConsumerTest { } */ - while (true) { - /* - * poll() API 是拉取消息的长轮询 - */ - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + List writeList = new ArrayList(); + List delList = new ArrayList(); + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { //kafka中的offset是一个64位的有符号整数 //64位的有符号整数的最大值9223372036854775807 - System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(), - record.offset(), record.key(), record.value(), record.timestamp()); + Record r = new Record(); + r.set("offset", record.offset()); + r.set("timestamp", record.timestamp()); + JSONObject jo = JSONObject.parseObject(record.value()); + r.set("database", jo.get("database")); + r.set("table", jo.get("table")); + r.set("type", jo.get("type")); + writeList.add(r); + delList.add(record.offset()); } if (records.count() > 0) { // 手动同步提交offset,当前线程会阻塞直到offset提交成功 // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了 - consumer.commitSync(); + //consumer.commitSync(); + System.out.println("将插入数据条数:"+writeList.size()); // 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑 - /*consumer.commitAsync(new OffsetCommitCallback() { - @Override - public void onComplete(Map offsets, Exception exception) { - if (exception != null) { - System.err.println("Commit failed for " + offsets); - System.err.println("Commit failed exception: " + exception.getStackTrace()); + consumer.commitAsync((offsets, exception) -> { + if (exception != null) { + System.err.println("Commit failed for " + offsets); + System.err.println("Commit failed exception: " + exception.getStackTrace()); + } else { + if (writeList.size() > 0) { + String str = ""; + for (Long aLong : delList) { + str = str + "," + aLong; + } + if (str.length() > 0) { + str = str.substring(1); + String sql = "delete from t_sync_log where offset in (" + str + ")"; + Db.update(sql); + } + Db.batchSave("t_sync_log", writeList, 100); } } - });*/ + }); } } } diff --git a/src/main/java/UnitTest/t_sync_log.sql b/src/main/java/UnitTest/t_sync_log.sql new file mode 100644 index 0000000..5b3138d --- /dev/null +++ b/src/main/java/UnitTest/t_sync_log.sql @@ -0,0 +1,34 @@ +/* + Navicat Premium Data Transfer + + Source Server : 10.10.14.169 + Source Server Type : MySQL + Source Server Version : 100505 (10.5.5-MariaDB-log) + Source Host : 10.10.14.169:22066 + Source Schema : ccdjzswd_db + + Target Server Type : MySQL + Target Server Version : 100505 (10.5.5-MariaDB-log) + File Encoding : 65001 + + Date: 10/05/2023 15:21:01 +*/ + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for t_sync_log +-- ---------------------------- +DROP TABLE IF EXISTS `t_sync_log`; +CREATE TABLE `t_sync_log` ( + `offset` bigint NOT NULL, + `timestamp` bigint NOT NULL, + `database` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `table` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + PRIMARY KEY (`offset`) USING BTREE, + INDEX `database`(`database` ASC, `table` ASC, `type` ASC) USING BTREE +) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; + +SET FOREIGN_KEY_CHECKS = 1; From 993e685d374ba6b3de122996e62b128a08bbb622 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 16:10:20 +0800 Subject: [PATCH 04/11] 'commit' --- src/main/java/UnitTest/kafkaConsumerTest.java | 41 +++++++------------ 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index f06fba2..efaa484 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -1,25 +1,25 @@ package UnitTest; import com.alibaba.fastjson.JSONObject; -import com.aspose.slides.Collections.ArrayList; -import com.dsideal.FengHuang.Util.CommonUtil; -import com.jfinal.kit.PathKit; import com.jfinal.kit.PropKit; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.druid.DruidPlugin; -import com.jfinal.plugin.redis.RedisPlugin; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; -import java.io.File; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.time.Duration; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; public class kafkaConsumerTest { @@ -131,7 +131,7 @@ public class kafkaConsumerTest { while (true) { List writeList = new ArrayList(); - List delList = new ArrayList(); + List delList = new ArrayList(); ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { //kafka中的offset是一个64位的有符号整数 @@ -144,33 +144,20 @@ public class kafkaConsumerTest { r.set("table", jo.get("table")); r.set("type", jo.get("type")); writeList.add(r); - delList.add(record.offset()); + String sql = "delete from t_sync_log where offset = " + record.offset(); + delList.add(sql); } if (records.count() > 0) { - // 手动同步提交offset,当前线程会阻塞直到offset提交成功 - // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了 - //consumer.commitSync(); - - System.out.println("将插入数据条数:"+writeList.size()); + System.out.println("将插入数据条数:" + writeList.size()); // 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑 consumer.commitAsync((offsets, exception) -> { if (exception != null) { System.err.println("Commit failed for " + offsets); System.err.println("Commit failed exception: " + exception.getStackTrace()); } else { - if (writeList.size() > 0) { - String str = ""; - for (Long aLong : delList) { - str = str + "," + aLong; - } - if (str.length() > 0) { - str = str.substring(1); - String sql = "delete from t_sync_log where offset in (" + str + ")"; - Db.update(sql); - } - Db.batchSave("t_sync_log", writeList, 100); - } + if (delList.size() > 0) Db.batch(delList, 100); + if (writeList.size() > 0) Db.batchSave("t_sync_log", writeList, 100); } }); } From 036978b3bdbe1121c8a4726fdb7e7c6909ddb97f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 16:11:52 +0800 Subject: [PATCH 05/11] 'commit' --- src/main/java/UnitTest/kafkaConsumerTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index efaa484..29eb23e 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -56,10 +56,7 @@ public class kafkaConsumerTest { //一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); - /* - 如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱, - 会将其踢出消费组,将分区分配给别的consumer消费 - */ + //如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -150,7 +147,6 @@ public class kafkaConsumerTest { if (records.count() > 0) { System.out.println("将插入数据条数:" + writeList.size()); - // 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑 consumer.commitAsync((offsets, exception) -> { if (exception != null) { System.err.println("Commit failed for " + offsets); From d5ad5450996dab517842c22a11021534af8c4d6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 16:14:56 +0800 Subject: [PATCH 06/11] 'commit' --- .../resource/ExcelExportTemplate/exam.json | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/resource/ExcelExportTemplate/exam.json b/src/main/resource/ExcelExportTemplate/exam.json index 5793061..59f567e 100644 --- a/src/main/resource/ExcelExportTemplate/exam.json +++ b/src/main/resource/ExcelExportTemplate/exam.json @@ -1,6 +1,6 @@ { - "title": "党建知识问答排名结果", - "sheetName": "结果表", + "title": "长春市教育局学习贯彻习近平新时代中国特色社会主义思想知识答题", + "sheetName": "结果", "titleHeight": 30, "rowHeight": 30, "showNumber": true, @@ -8,37 +8,37 @@ { "show_column_name": "姓名", "list_column_name": "person_name", - "width": 40 + "width": 30 }, { "show_column_name": "处室", "list_column_name": "ks", - "width": 40 + "width": 30 }, { "show_column_name": "电话", "list_column_name": "tel", - "width": 40 + "width": 30 }, { "show_column_name": "得分", "list_column_name": "score", - "width": 40 + "width": 20 }, { "show_column_name": "用时", "list_column_name": "ys", - "width": 40 + "width": 20 }, { "show_column_name": "开始时间", "list_column_name": "start_time", - "width": 40 + "width": 32 }, { "show_column_name": "交卷时间", "list_column_name": "end_time", - "width": 40 + "width": 32 } ] } \ No newline at end of file From 1dfb085d19616a4b1e32885966389cc3fb19d188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 10 May 2023 16:18:07 +0800 Subject: [PATCH 07/11] 'commit' --- src/main/resource/ExcelExportTemplate/exam.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resource/ExcelExportTemplate/exam.json b/src/main/resource/ExcelExportTemplate/exam.json index 59f567e..9830166 100644 --- a/src/main/resource/ExcelExportTemplate/exam.json +++ b/src/main/resource/ExcelExportTemplate/exam.json @@ -8,7 +8,7 @@ { "show_column_name": "姓名", "list_column_name": "person_name", - "width": 30 + "width": 20 }, { "show_column_name": "处室", From 5e0561f866dd22e8c56e8db30fe84c2bcb4147ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Thu, 11 May 2023 09:48:51 +0800 Subject: [PATCH 08/11] 'commit' --- pom.xml | 6 + src/main/java/UnitTest/HexDataSender.java | 111 +++++++++++++++++++ target/classes/ExcelExportTemplate/exam.json | 18 +-- 3 files changed, 126 insertions(+), 9 deletions(-) create mode 100644 src/main/java/UnitTest/HexDataSender.java diff --git a/pom.xml b/pom.xml index bc3619a..cbd8781 100644 --- a/pom.xml +++ b/pom.xml @@ -285,6 +285,12 @@ kafka-clients 3.4.0 + + + io.netty + netty-all + 4.1.32.Final + diff --git a/src/main/java/UnitTest/HexDataSender.java b/src/main/java/UnitTest/HexDataSender.java new file mode 100644 index 0000000..f717434 --- /dev/null +++ b/src/main/java/UnitTest/HexDataSender.java @@ -0,0 +1,111 @@ +package UnitTest; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.MessageToByteEncoder; + +public class HexDataSender { + private final String host; + private final int port; + + public HexDataSender(String host, int port) { + this.host = host; + this.port = port; + } + + public void sendHexData(String hexData) throws Exception { + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + Bootstrap b = new Bootstrap(); + b.group(workerGroup); + b.channel(NioSocketChannel.class); + b.option(ChannelOption.SO_KEEPALIVE, true); + b.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HexDataEncoder()); + p.addLast(new HexDataSenderHandler(hexData)); + } + }); + + // Start the client. + ChannelFuture f = b.connect(host, port).sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + } + } + + public static String IP = "10.10.21.18"; + public static int PORT = 8001; + + public static void Open(String sb_id) throws Exception { + //开 + String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 01".replace("?", sb_id).replace(" ", ""); + HexDataSender sender = new HexDataSender(IP, PORT); + sender.sendHexData(hexData); + } + + public static void Close(String sb_id) throws Exception { + //关 + String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 00".replace("?", sb_id).replace(" ", ""); + HexDataSender sender = new HexDataSender(IP, PORT); + sender.sendHexData(hexData); + } + + public static void main(String[] args) throws Exception { + Close("6D FF"); + //Thread.sleep(3000); + //Open("6D FF"); + } +} + +class HexDataSenderHandler extends ChannelInboundHandlerAdapter { + private final String hexData; + + public HexDataSenderHandler(String hexData) { + this.hexData = hexData; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + byte[] bytes = hexStringToByteArray(hexData); + ByteBuf buffer = Unpooled.buffer(bytes.length); + buffer.writeBytes(bytes); + ctx.writeAndFlush(buffer); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + + private byte[] hexStringToByteArray(String hexData) { + int len = hexData.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(hexData.charAt(i), 16) << 4) + + Character.digit(hexData.charAt(i + 1), 16)); + } + return data; + } +} + +class HexDataEncoder extends MessageToByteEncoder { + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { + byte[] bytes = new byte[msg.readableBytes()]; + msg.getBytes(msg.readerIndex(), bytes); + out.writeBytes(bytes); + } +} diff --git a/target/classes/ExcelExportTemplate/exam.json b/target/classes/ExcelExportTemplate/exam.json index 5793061..9830166 100644 --- a/target/classes/ExcelExportTemplate/exam.json +++ b/target/classes/ExcelExportTemplate/exam.json @@ -1,6 +1,6 @@ { - "title": "党建知识问答排名结果", - "sheetName": "结果表", + "title": "长春市教育局学习贯彻习近平新时代中国特色社会主义思想知识答题", + "sheetName": "结果", "titleHeight": 30, "rowHeight": 30, "showNumber": true, @@ -8,37 +8,37 @@ { "show_column_name": "姓名", "list_column_name": "person_name", - "width": 40 + "width": 20 }, { "show_column_name": "处室", "list_column_name": "ks", - "width": 40 + "width": 30 }, { "show_column_name": "电话", "list_column_name": "tel", - "width": 40 + "width": 30 }, { "show_column_name": "得分", "list_column_name": "score", - "width": 40 + "width": 20 }, { "show_column_name": "用时", "list_column_name": "ys", - "width": 40 + "width": 20 }, { "show_column_name": "开始时间", "list_column_name": "start_time", - "width": 40 + "width": 32 }, { "show_column_name": "交卷时间", "list_column_name": "end_time", - "width": 40 + "width": 32 } ] } \ No newline at end of file From a023089b9d8880f4790660aeb19f9f7d7481b37e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Thu, 11 May 2023 11:02:15 +0800 Subject: [PATCH 09/11] 'commit' --- pom.xml | 2 +- src/main/java/UnitTest/HexDataSender.java | 18 +++--- src/main/java/UnitTest/HexStringSender.java | 70 +++++++++++++++++++++ 3 files changed, 80 insertions(+), 10 deletions(-) create mode 100644 src/main/java/UnitTest/HexStringSender.java diff --git a/pom.xml b/pom.xml index cbd8781..05ed064 100644 --- a/pom.xml +++ b/pom.xml @@ -289,7 +289,7 @@ io.netty netty-all - 4.1.32.Final + 4.1.92.Final diff --git a/src/main/java/UnitTest/HexDataSender.java b/src/main/java/UnitTest/HexDataSender.java index f717434..5d60801 100644 --- a/src/main/java/UnitTest/HexDataSender.java +++ b/src/main/java/UnitTest/HexDataSender.java @@ -29,15 +29,15 @@ public class HexDataSender { b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { - ChannelPipeline p = ch.pipeline(); - p.addLast(new HexDataEncoder()); - p.addLast(new HexDataSenderHandler(hexData)); + //ChannelPipeline p = ch.pipeline(); + //p.addLast(new HexDataEncoder()); + //p.addLast(new HexDataSenderHandler(hexData)); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); - + f.channel().writeAndFlush(Unpooled.copiedBuffer(hexData.getBytes())).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { @@ -48,9 +48,9 @@ public class HexDataSender { public static String IP = "10.10.21.18"; public static int PORT = 8001; - public static void Open(String sb_id) throws Exception { + public static void Open() throws Exception { //开 - String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 01".replace("?", sb_id).replace(" ", ""); + String hexData = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 01".replace(" ",""); HexDataSender sender = new HexDataSender(IP, PORT); sender.sendHexData(hexData); } @@ -63,9 +63,9 @@ public class HexDataSender { } public static void main(String[] args) throws Exception { - Close("6D FF"); - //Thread.sleep(3000); - //Open("6D FF"); + //Close("6D FF"); + + Open(); } } diff --git a/src/main/java/UnitTest/HexStringSender.java b/src/main/java/UnitTest/HexStringSender.java new file mode 100644 index 0000000..f6292ad --- /dev/null +++ b/src/main/java/UnitTest/HexStringSender.java @@ -0,0 +1,70 @@ +package UnitTest; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +public class HexStringSender { + public static String IP = "10.10.21.18"; + public static int PORT = 8001; + + //# 开 + public static String open_hex_str = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 01"; + + + //关 + public static String close_hex_str = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 00"; + + public static void main(String[] args) throws IOException, InterruptedException { + + for (int i = 1; ; i++) { + Socket socket = new Socket(IP, PORT); + String hex_str; + if (i % 2 == 0) hex_str = close_hex_str; + else hex_str = open_hex_str; + byte[] bytes = hexStringToByteArray(hex_str); + OutputStream os = socket.getOutputStream(); + os.write(bytes); + socket.close(); + + Thread.sleep(2000); + } + + } + + /** + * 16进制表示的字符串转换为字节数组 + * + * @param hexString 16进制表示的字符串 + * @return byte[] 字节数组 + */ + public static byte[] hexStringToByteArray(String hexString) { + hexString = hexString.replaceAll(" ", ""); + int len = hexString.length(); + byte[] bytes = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + // 两位一组,表示一个字节,把这样表示的16进制字符串,还原成一个字节 + bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character + .digit(hexString.charAt(i + 1), 16)); + } + return bytes; + } + + /** + * 将字节数组转换成十六进制的字符串 + * + * @return + */ + public static String BinaryToHexString(byte[] bytes) { + String hexStr = "0123456789ABCDEF"; + String result = ""; + String hex = ""; + for (byte b : bytes) { + hex = String.valueOf(hexStr.charAt((b & 0xF0) >> 4)); + hex += String.valueOf(hexStr.charAt(b & 0x0F)); + result += hex + " "; + } + return result; + } +} From 8e3b8d57f528a48bc03bae2025beab37daa31507 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Thu, 11 May 2023 11:19:04 +0800 Subject: [PATCH 10/11] 'commit' --- src/main/java/UnitTest/HexStringSender.java | 6 --- src/main/java/UnitTest/SocketUtilTest.java | 30 ++++++++++++++ .../dsideal/FengHuang/Util/SocketUtil.java} | 39 +++++-------------- 3 files changed, 39 insertions(+), 36 deletions(-) create mode 100644 src/main/java/UnitTest/SocketUtilTest.java rename src/main/java/{UnitTest/HexDataSender.java => com/dsideal/FengHuang/Util/SocketUtil.java} (65%) diff --git a/src/main/java/UnitTest/HexStringSender.java b/src/main/java/UnitTest/HexStringSender.java index f6292ad..28c97fa 100644 --- a/src/main/java/UnitTest/HexStringSender.java +++ b/src/main/java/UnitTest/HexStringSender.java @@ -1,23 +1,18 @@ package UnitTest; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; public class HexStringSender { public static String IP = "10.10.21.18"; public static int PORT = 8001; - //# 开 public static String open_hex_str = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 01"; - - //关 public static String close_hex_str = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 00"; public static void main(String[] args) throws IOException, InterruptedException { - for (int i = 1; ; i++) { Socket socket = new Socket(IP, PORT); String hex_str; @@ -30,7 +25,6 @@ public class HexStringSender { Thread.sleep(2000); } - } /** diff --git a/src/main/java/UnitTest/SocketUtilTest.java b/src/main/java/UnitTest/SocketUtilTest.java new file mode 100644 index 0000000..d003f24 --- /dev/null +++ b/src/main/java/UnitTest/SocketUtilTest.java @@ -0,0 +1,30 @@ +package UnitTest; + +import com.dsideal.FengHuang.Util.SocketUtil; + +public class SocketUtilTest { + + public static void Open(String host, int port, String sb_id) throws Exception { + //开 + String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 01".replace("?", sb_id).replace(" ", ""); + SocketUtil sender = new SocketUtil(host, port); + sender.sendHexData(hexData); + } + + public static void Close(String host, int port, String sb_id) throws Exception { + //关 + String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 00".replace("?", sb_id).replace(" ", ""); + SocketUtil sender = new SocketUtil(host, port); + sender.sendHexData(hexData); + } + public static void main(String[] args) throws Exception { + String HOST = "10.10.21.18"; + int PORT = 8001; + String sb_id = "6D FF"; + for (int i = 1; ; i++) { + if (i % 2 == 1) Open(HOST, PORT, sb_id); + else Close(HOST, PORT, sb_id); + Thread.sleep(1000); + } + } +} diff --git a/src/main/java/UnitTest/HexDataSender.java b/src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java similarity index 65% rename from src/main/java/UnitTest/HexDataSender.java rename to src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java index 5d60801..c8d61c6 100644 --- a/src/main/java/UnitTest/HexDataSender.java +++ b/src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java @@ -1,4 +1,4 @@ -package UnitTest; +package com.dsideal.FengHuang.Util; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; @@ -9,11 +9,11 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.MessageToByteEncoder; -public class HexDataSender { +public class SocketUtil { private final String host; private final int port; - public HexDataSender(String host, int port) { + public SocketUtil(String host, int port) { this.host = host; this.port = port; } @@ -29,44 +29,23 @@ public class HexDataSender { b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { - //ChannelPipeline p = ch.pipeline(); - //p.addLast(new HexDataEncoder()); - //p.addLast(new HexDataSenderHandler(hexData)); + ChannelPipeline p = ch.pipeline(); + p.addLast(new HexDataEncoder()); + p.addLast(new HexDataSenderHandler(hexData)); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); - f.channel().writeAndFlush(Unpooled.copiedBuffer(hexData.getBytes())).sync(); // Wait until the connection is closed. - f.channel().closeFuture().sync(); + //f.channel().closeFuture().sync(); + //手动关闭 + f.channel().close(); } finally { workerGroup.shutdownGracefully(); } } - public static String IP = "10.10.21.18"; - public static int PORT = 8001; - - public static void Open() throws Exception { - //开 - String hexData = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 01".replace(" ",""); - HexDataSender sender = new HexDataSender(IP, PORT); - sender.sendHexData(hexData); - } - - public static void Close(String sb_id) throws Exception { - //关 - String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 00".replace("?", sb_id).replace(" ", ""); - HexDataSender sender = new HexDataSender(IP, PORT); - sender.sendHexData(hexData); - } - - public static void main(String[] args) throws Exception { - //Close("6D FF"); - - Open(); - } } class HexDataSenderHandler extends ChannelInboundHandlerAdapter { From 9e03fb2c59397e0498401fda3802e415dd617f0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Thu, 11 May 2023 11:38:28 +0800 Subject: [PATCH 11/11] 'commit' --- src/main/java/UnitTest/SocketUtilTest.java | 10 +++ .../dsideal/FengHuang/Util/SocketUtil.java | 84 ++++++++++++------- 2 files changed, 63 insertions(+), 31 deletions(-) diff --git a/src/main/java/UnitTest/SocketUtilTest.java b/src/main/java/UnitTest/SocketUtilTest.java index d003f24..a2eaafd 100644 --- a/src/main/java/UnitTest/SocketUtilTest.java +++ b/src/main/java/UnitTest/SocketUtilTest.java @@ -17,6 +17,15 @@ public class SocketUtilTest { SocketUtil sender = new SocketUtil(host, port); sender.sendHexData(hexData); } + + public static void getList(String host, int port) throws Exception { + //String gateWay = "1141f534"; + String hexData = "08 00 34 f5 41 11 FE 81"; + hexData=hexData.replace(" ",""); + SocketUtil sender = new SocketUtil(host, port); + sender.sendHexData(hexData); + } + public static void main(String[] args) throws Exception { String HOST = "10.10.21.18"; int PORT = 8001; @@ -26,5 +35,6 @@ public class SocketUtilTest { else Close(HOST, PORT, sb_id); Thread.sleep(1000); } +// getList(HOST,PORT); } } diff --git a/src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java b/src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java index c8d61c6..966cbcb 100644 --- a/src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java +++ b/src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java @@ -9,6 +9,8 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.MessageToByteEncoder; +import java.nio.charset.StandardCharsets; + public class SocketUtil { private final String host; private final int port; @@ -32,6 +34,7 @@ public class SocketUtil { ChannelPipeline p = ch.pipeline(); p.addLast(new HexDataEncoder()); p.addLast(new HexDataSenderHandler(hexData)); + p.addLast(new ClientHandler()); // 添加客户端 Handler } }); @@ -46,45 +49,64 @@ public class SocketUtil { } } -} + class HexDataSenderHandler extends ChannelInboundHandlerAdapter { + private final String hexData; -class HexDataSenderHandler extends ChannelInboundHandlerAdapter { - private final String hexData; + public HexDataSenderHandler(String hexData) { + this.hexData = hexData; + } - public HexDataSenderHandler(String hexData) { - this.hexData = hexData; - } + @Override + public void channelActive(ChannelHandlerContext ctx) { + byte[] bytes = hexStringToByteArray(hexData); + ByteBuf buffer = Unpooled.buffer(bytes.length); + buffer.writeBytes(bytes); + ctx.writeAndFlush(buffer); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + + private byte[] hexStringToByteArray(String hexData) { + int len = hexData.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(hexData.charAt(i), 16) << 4) + + Character.digit(hexData.charAt(i + 1), 16)); + } + return data; + } - @Override - public void channelActive(ChannelHandlerContext ctx) { - byte[] bytes = hexStringToByteArray(hexData); - ByteBuf buffer = Unpooled.buffer(bytes.length); - buffer.writeBytes(bytes); - ctx.writeAndFlush(buffer); - } - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); - ctx.close(); } - private byte[] hexStringToByteArray(String hexData) { - int len = hexData.length(); - byte[] data = new byte[len / 2]; - for (int i = 0; i < len; i += 2) { - data[i / 2] = (byte) ((Character.digit(hexData.charAt(i), 16) << 4) + - Character.digit(hexData.charAt(i + 1), 16)); + static class ClientHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + // 处理服务器响应数据 + byte[] bytes = (byte[]) msg; + String response = new String(bytes, StandardCharsets.US_ASCII); + System.out.println("Server response: " + response); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); } - return data; } -} -class HexDataEncoder extends MessageToByteEncoder { - @Override - protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { - byte[] bytes = new byte[msg.readableBytes()]; - msg.getBytes(msg.readerIndex(), bytes); - out.writeBytes(bytes); + static class HexDataEncoder extends MessageToByteEncoder { + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { + byte[] bytes = new byte[msg.readableBytes()]; + msg.getBytes(msg.readerIndex(), bytes); + out.writeBytes(bytes); + } } + }