diff --git a/pom.xml b/pom.xml index bc3619a..05ed064 100644 --- a/pom.xml +++ b/pom.xml @@ -285,6 +285,12 @@ kafka-clients 3.4.0 + + + io.netty + netty-all + 4.1.92.Final + diff --git a/src/main/java/UnitTest/HexStringSender.java b/src/main/java/UnitTest/HexStringSender.java new file mode 100644 index 0000000..28c97fa --- /dev/null +++ b/src/main/java/UnitTest/HexStringSender.java @@ -0,0 +1,64 @@ +package UnitTest; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; + +public class HexStringSender { + public static String IP = "10.10.21.18"; + public static int PORT = 8001; + //# 开 + public static String open_hex_str = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 01"; + //关 + public static String close_hex_str = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 00"; + + public static void main(String[] args) throws IOException, InterruptedException { + for (int i = 1; ; i++) { + Socket socket = new Socket(IP, PORT); + String hex_str; + if (i % 2 == 0) hex_str = close_hex_str; + else hex_str = open_hex_str; + byte[] bytes = hexStringToByteArray(hex_str); + OutputStream os = socket.getOutputStream(); + os.write(bytes); + socket.close(); + + Thread.sleep(2000); + } + } + + /** + * 16进制表示的字符串转换为字节数组 + * + * @param hexString 16进制表示的字符串 + * @return byte[] 字节数组 + */ + public static byte[] hexStringToByteArray(String hexString) { + hexString = hexString.replaceAll(" ", ""); + int len = hexString.length(); + byte[] bytes = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + // 两位一组,表示一个字节,把这样表示的16进制字符串,还原成一个字节 + bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character + .digit(hexString.charAt(i + 1), 16)); + } + return bytes; + } + + /** + * 将字节数组转换成十六进制的字符串 + * + * @return + */ + public static String BinaryToHexString(byte[] bytes) { + String hexStr = "0123456789ABCDEF"; + String result = ""; + String hex = ""; + for (byte b : bytes) { + hex = String.valueOf(hexStr.charAt((b & 0xF0) >> 4)); + hex += String.valueOf(hexStr.charAt(b & 0x0F)); + result += hex + " "; + } + return result; + } +} diff --git a/src/main/java/UnitTest/KafkaProductorTest.java b/src/main/java/UnitTest/KafkaProductorTest.java index 124037a..3c70959 100644 --- a/src/main/java/UnitTest/KafkaProductorTest.java +++ b/src/main/java/UnitTest/KafkaProductorTest.java @@ -53,12 +53,22 @@ public class KafkaProductorTest { //指定发送分区 String TOPIC_NAME = "test"; ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME - , 0, "HuangHai_" + i, "黄海_" + i); + , 0, "HuangHai_" + i, "{\"id\":" + i + ",\"name\"=\"HuangHai_" + i + "\"}"); //等待消息发送成功的同步阻塞方法 - RecordMetadata metadata = producer.send(producerRecord).get(); - System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" - + metadata.partition() + "|offset-" + metadata.offset() + "|" + i); + //RecordMetadata metadata = producer.send(producerRecord).get(); + //System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + //+ metadata.partition() + "|offset-" + metadata.offset() + "|" + i); + + // 异步发送消息 + producer.send(producerRecord, (metadata, exception) -> { + if (exception != null) { + System.err.println("异步发送消息失败:" + exception.getMessage()); + } else { + System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + + metadata.partition() + "|offset-" + metadata.offset()); + } + }); } producer.close(); } diff --git a/src/main/java/UnitTest/Consumer.java b/src/main/java/UnitTest/RocketMqConsumer.java similarity index 94% rename from src/main/java/UnitTest/Consumer.java rename to src/main/java/UnitTest/RocketMqConsumer.java index b624d3d..70537d6 100644 --- a/src/main/java/UnitTest/Consumer.java +++ b/src/main/java/UnitTest/RocketMqConsumer.java @@ -8,7 +8,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; /** * @Date 2021-04-20 15:47 */ -public class Consumer { +public class RocketMqConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group"); consumer.setNamesrvAddr("10.10.14.231:9876"); diff --git a/src/main/java/UnitTest/SocketUtilTest.java b/src/main/java/UnitTest/SocketUtilTest.java new file mode 100644 index 0000000..a2eaafd --- /dev/null +++ b/src/main/java/UnitTest/SocketUtilTest.java @@ -0,0 +1,40 @@ +package UnitTest; + +import com.dsideal.FengHuang.Util.SocketUtil; + +public class SocketUtilTest { + + public static void Open(String host, int port, String sb_id) throws Exception { + //开 + String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 01".replace("?", sb_id).replace(" ", ""); + SocketUtil sender = new SocketUtil(host, port); + sender.sendHexData(hexData); + } + + public static void Close(String host, int port, String sb_id) throws Exception { + //关 + String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 00".replace("?", sb_id).replace(" ", ""); + SocketUtil sender = new SocketUtil(host, port); + sender.sendHexData(hexData); + } + + public static void getList(String host, int port) throws Exception { + //String gateWay = "1141f534"; + String hexData = "08 00 34 f5 41 11 FE 81"; + hexData=hexData.replace(" ",""); + SocketUtil sender = new SocketUtil(host, port); + sender.sendHexData(hexData); + } + + public static void main(String[] args) throws Exception { + String HOST = "10.10.21.18"; + int PORT = 8001; + String sb_id = "6D FF"; + for (int i = 1; ; i++) { + if (i % 2 == 1) Open(HOST, PORT, sb_id); + else Close(HOST, PORT, sb_id); + Thread.sleep(1000); + } +// getList(HOST,PORT); + } +} diff --git a/src/main/java/UnitTest/kafkaConsumerTest.java b/src/main/java/UnitTest/kafkaConsumerTest.java index c338786..29eb23e 100644 --- a/src/main/java/UnitTest/kafkaConsumerTest.java +++ b/src/main/java/UnitTest/kafkaConsumerTest.java @@ -1,14 +1,25 @@ package UnitTest; -import org.apache.kafka.clients.consumer.*; +import com.alibaba.fastjson.JSONObject; +import com.jfinal.kit.PropKit; +import com.jfinal.plugin.activerecord.ActiveRecordPlugin; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.Record; +import com.jfinal.plugin.druid.DruidPlugin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.time.Duration; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; public class kafkaConsumerTest { @@ -45,10 +56,7 @@ public class kafkaConsumerTest { //一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); - /* - 如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱, - 会将其踢出消费组,将分区分配给别的consumer消费 - */ + //如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -56,32 +64,37 @@ public class kafkaConsumerTest { } public static void main(String[] args) throws ParseException { - String TOPIC_NAME = "test"; - KafkaConsumer consumer = new KafkaConsumer<>(initConfig()); - //如果使用自动分区分配,那就不要使用手动分区分配,反之亦然,两者不可同时使用,使用一种方法,就注释掉另一种方法。 - //如使用手动分区分配,那么注释掉自动分区分配。 - //https://blog.csdn.net/qq_41413743/article/details/123636586 - // consumer.subscribe(Arrays.asList(TOPIC_NAME)); + //1、配置数据库 + PropKit.use("application.properties"); + + DruidPlugin druid = new DruidPlugin(PropKit.get("jdbcUrl"), PropKit.get("user"), + PropKit.get("password").trim(), PropKit.get("driverClassName")); + druid.start(); + // 配置ActiveRecord插件 + ActiveRecordPlugin arp = new ActiveRecordPlugin(druid); + arp.start(); + + //2、读取Kafka + String TOPIC_NAME = "maxwell"; + KafkaConsumer consumer = new KafkaConsumer<>(initConfig()); // 消费指定分区 consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //消息回溯消费 - /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); - consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/ + consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); + consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //指定offset消费 - /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); - consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/ + //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); + //consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意:这里的包含3095的!!!!! //从指定时间点开始消费 List topicPartitions = consumer.partitionsFor(TOPIC_NAME); - - //从1小时前开始消费 -// long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; - + /* + //一般使用offset就可以满足所有需求了,不需要使用按时间点 //从某个时间点获取之后的数据 String dateString = "2023-05-10 08:38:00"; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -111,33 +124,38 @@ public class kafkaConsumerTest { consumer.seek(key, offset); } } - + */ while (true) { - /* - * poll() API 是拉取消息的长轮询 - */ - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + List writeList = new ArrayList(); + List delList = new ArrayList(); + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { - System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(), - record.offset(), record.key(), record.value(), record.timestamp()); + //kafka中的offset是一个64位的有符号整数 + //64位的有符号整数的最大值9223372036854775807 + Record r = new Record(); + r.set("offset", record.offset()); + r.set("timestamp", record.timestamp()); + JSONObject jo = JSONObject.parseObject(record.value()); + r.set("database", jo.get("database")); + r.set("table", jo.get("table")); + r.set("type", jo.get("type")); + writeList.add(r); + String sql = "delete from t_sync_log where offset = " + record.offset(); + delList.add(sql); } if (records.count() > 0) { - // 手动同步提交offset,当前线程会阻塞直到offset提交成功 - // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了 - consumer.commitSync(); - - // 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑 - /*consumer.commitAsync(new OffsetCommitCallback() { - @Override - public void onComplete(Map offsets, Exception exception) { - if (exception != null) { - System.err.println("Commit failed for " + offsets); - System.err.println("Commit failed exception: " + exception.getStackTrace()); - } + System.out.println("将插入数据条数:" + writeList.size()); + consumer.commitAsync((offsets, exception) -> { + if (exception != null) { + System.err.println("Commit failed for " + offsets); + System.err.println("Commit failed exception: " + exception.getStackTrace()); + } else { + if (delList.size() > 0) Db.batch(delList, 100); + if (writeList.size() > 0) Db.batchSave("t_sync_log", writeList, 100); } - });*/ + }); } } } diff --git a/src/main/java/UnitTest/t_sync_log.sql b/src/main/java/UnitTest/t_sync_log.sql new file mode 100644 index 0000000..5b3138d --- /dev/null +++ b/src/main/java/UnitTest/t_sync_log.sql @@ -0,0 +1,34 @@ +/* + Navicat Premium Data Transfer + + Source Server : 10.10.14.169 + Source Server Type : MySQL + Source Server Version : 100505 (10.5.5-MariaDB-log) + Source Host : 10.10.14.169:22066 + Source Schema : ccdjzswd_db + + Target Server Type : MySQL + Target Server Version : 100505 (10.5.5-MariaDB-log) + File Encoding : 65001 + + Date: 10/05/2023 15:21:01 +*/ + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for t_sync_log +-- ---------------------------- +DROP TABLE IF EXISTS `t_sync_log`; +CREATE TABLE `t_sync_log` ( + `offset` bigint NOT NULL, + `timestamp` bigint NOT NULL, + `database` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `table` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + PRIMARY KEY (`offset`) USING BTREE, + INDEX `database`(`database` ASC, `table` ASC, `type` ASC) USING BTREE +) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; + +SET FOREIGN_KEY_CHECKS = 1; diff --git a/src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java b/src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java new file mode 100644 index 0000000..966cbcb --- /dev/null +++ b/src/main/java/com/dsideal/FengHuang/Util/SocketUtil.java @@ -0,0 +1,112 @@ +package com.dsideal.FengHuang.Util; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.MessageToByteEncoder; + +import java.nio.charset.StandardCharsets; + +public class SocketUtil { + private final String host; + private final int port; + + public SocketUtil(String host, int port) { + this.host = host; + this.port = port; + } + + public void sendHexData(String hexData) throws Exception { + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + Bootstrap b = new Bootstrap(); + b.group(workerGroup); + b.channel(NioSocketChannel.class); + b.option(ChannelOption.SO_KEEPALIVE, true); + b.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HexDataEncoder()); + p.addLast(new HexDataSenderHandler(hexData)); + p.addLast(new ClientHandler()); // 添加客户端 Handler + } + }); + + // Start the client. + ChannelFuture f = b.connect(host, port).sync(); + // Wait until the connection is closed. + //f.channel().closeFuture().sync(); + //手动关闭 + f.channel().close(); + } finally { + workerGroup.shutdownGracefully(); + } + } + + class HexDataSenderHandler extends ChannelInboundHandlerAdapter { + private final String hexData; + + public HexDataSenderHandler(String hexData) { + this.hexData = hexData; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + byte[] bytes = hexStringToByteArray(hexData); + ByteBuf buffer = Unpooled.buffer(bytes.length); + buffer.writeBytes(bytes); + ctx.writeAndFlush(buffer); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + + private byte[] hexStringToByteArray(String hexData) { + int len = hexData.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(hexData.charAt(i), 16) << 4) + + Character.digit(hexData.charAt(i + 1), 16)); + } + return data; + } + + + } + + static class ClientHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + // 处理服务器响应数据 + byte[] bytes = (byte[]) msg; + String response = new String(bytes, StandardCharsets.US_ASCII); + System.out.println("Server response: " + response); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + } + + static class HexDataEncoder extends MessageToByteEncoder { + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { + byte[] bytes = new byte[msg.readableBytes()]; + msg.getBytes(msg.readerIndex(), bytes); + out.writeBytes(bytes); + } + } + +} diff --git a/src/main/resource/ExcelExportTemplate/exam.json b/src/main/resource/ExcelExportTemplate/exam.json index 5793061..9830166 100644 --- a/src/main/resource/ExcelExportTemplate/exam.json +++ b/src/main/resource/ExcelExportTemplate/exam.json @@ -1,6 +1,6 @@ { - "title": "党建知识问答排名结果", - "sheetName": "结果表", + "title": "长春市教育局学习贯彻习近平新时代中国特色社会主义思想知识答题", + "sheetName": "结果", "titleHeight": 30, "rowHeight": 30, "showNumber": true, @@ -8,37 +8,37 @@ { "show_column_name": "姓名", "list_column_name": "person_name", - "width": 40 + "width": 20 }, { "show_column_name": "处室", "list_column_name": "ks", - "width": 40 + "width": 30 }, { "show_column_name": "电话", "list_column_name": "tel", - "width": 40 + "width": 30 }, { "show_column_name": "得分", "list_column_name": "score", - "width": 40 + "width": 20 }, { "show_column_name": "用时", "list_column_name": "ys", - "width": 40 + "width": 20 }, { "show_column_name": "开始时间", "list_column_name": "start_time", - "width": 40 + "width": 32 }, { "show_column_name": "交卷时间", "list_column_name": "end_time", - "width": 40 + "width": 32 } ] } \ No newline at end of file diff --git a/target/classes/ExcelExportTemplate/exam.json b/target/classes/ExcelExportTemplate/exam.json index 5793061..9830166 100644 --- a/target/classes/ExcelExportTemplate/exam.json +++ b/target/classes/ExcelExportTemplate/exam.json @@ -1,6 +1,6 @@ { - "title": "党建知识问答排名结果", - "sheetName": "结果表", + "title": "长春市教育局学习贯彻习近平新时代中国特色社会主义思想知识答题", + "sheetName": "结果", "titleHeight": 30, "rowHeight": 30, "showNumber": true, @@ -8,37 +8,37 @@ { "show_column_name": "姓名", "list_column_name": "person_name", - "width": 40 + "width": 20 }, { "show_column_name": "处室", "list_column_name": "ks", - "width": 40 + "width": 30 }, { "show_column_name": "电话", "list_column_name": "tel", - "width": 40 + "width": 30 }, { "show_column_name": "得分", "list_column_name": "score", - "width": 40 + "width": 20 }, { "show_column_name": "用时", "list_column_name": "ys", - "width": 40 + "width": 20 }, { "show_column_name": "开始时间", "list_column_name": "start_time", - "width": 40 + "width": 32 }, { "show_column_name": "交卷时间", "list_column_name": "end_time", - "width": 40 + "width": 32 } ] } \ No newline at end of file