kgdxpr 2 years ago
commit 54146b86e5

@ -285,6 +285,12 @@
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<!--引入netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.92.Final</version>
</dependency>
</dependencies>
<build>
<plugins>

@ -0,0 +1,64 @@
package UnitTest;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
public class HexStringSender {
public static String IP = "10.10.21.18";
public static int PORT = 8001;
//# 开
public static String open_hex_str = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 01";
//关
public static String close_hex_str = "16 00 34 F5 41 11 FE 82 0D 02 6D FF 00 00 00 00 00 00 01 00 00 00";
public static void main(String[] args) throws IOException, InterruptedException {
for (int i = 1; ; i++) {
Socket socket = new Socket(IP, PORT);
String hex_str;
if (i % 2 == 0) hex_str = close_hex_str;
else hex_str = open_hex_str;
byte[] bytes = hexStringToByteArray(hex_str);
OutputStream os = socket.getOutputStream();
os.write(bytes);
socket.close();
Thread.sleep(2000);
}
}
/**
* 16
*
* @param hexString 16
* @return byte[]
*/
public static byte[] hexStringToByteArray(String hexString) {
hexString = hexString.replaceAll(" ", "");
int len = hexString.length();
byte[] bytes = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
// 两位一组,表示一个字节,把这样表示的16进制字符串还原成一个字节
bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character
.digit(hexString.charAt(i + 1), 16));
}
return bytes;
}
/**
*
*
* @return
*/
public static String BinaryToHexString(byte[] bytes) {
String hexStr = "0123456789ABCDEF";
String result = "";
String hex = "";
for (byte b : bytes) {
hex = String.valueOf(hexStr.charAt((b & 0xF0) >> 4));
hex += String.valueOf(hexStr.charAt(b & 0x0F));
result += hex + " ";
}
return result;
}
}

@ -53,12 +53,22 @@ public class KafkaProductorTest {
//指定发送分区
String TOPIC_NAME = "test";
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME
, 0, "HuangHai_" + i, "黄海_" + i);
, 0, "HuangHai_" + i, "{\"id\":" + i + ",\"name\"=\"HuangHai_" + i + "\"}");
//等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset() + "|" + i);
//RecordMetadata metadata = producer.send(producerRecord).get();
//System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
//+ metadata.partition() + "|offset-" + metadata.offset() + "|" + i);
// 异步发送消息
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
System.err.println("异步发送消息失败:" + exception.getMessage());
} else {
System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
});
}
producer.close();
}

@ -8,7 +8,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
/**
* @Date 2021-04-20 15:47
*/
public class Consumer {
public class RocketMqConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group");
consumer.setNamesrvAddr("10.10.14.231:9876");

@ -0,0 +1,40 @@
package UnitTest;
import com.dsideal.FengHuang.Util.SocketUtil;
public class SocketUtilTest {
public static void Open(String host, int port, String sb_id) throws Exception {
//开
String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 01".replace("?", sb_id).replace(" ", "");
SocketUtil sender = new SocketUtil(host, port);
sender.sendHexData(hexData);
}
public static void Close(String host, int port, String sb_id) throws Exception {
//关
String hexData = "16 00 34 F5 41 11 FE 82 0D 02 ? 00 00 00 00 00 00 01 00 00 00".replace("?", sb_id).replace(" ", "");
SocketUtil sender = new SocketUtil(host, port);
sender.sendHexData(hexData);
}
public static void getList(String host, int port) throws Exception {
//String gateWay = "1141f534";
String hexData = "08 00 34 f5 41 11 FE 81";
hexData=hexData.replace(" ","");
SocketUtil sender = new SocketUtil(host, port);
sender.sendHexData(hexData);
}
public static void main(String[] args) throws Exception {
String HOST = "10.10.21.18";
int PORT = 8001;
String sb_id = "6D FF";
for (int i = 1; ; i++) {
if (i % 2 == 1) Open(HOST, PORT, sb_id);
else Close(HOST, PORT, sb_id);
Thread.sleep(1000);
}
// getList(HOST,PORT);
}
}

@ -1,14 +1,25 @@
package UnitTest;
import org.apache.kafka.clients.consumer.*;
import com.alibaba.fastjson.JSONObject;
import com.jfinal.kit.PropKit;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.druid.DruidPlugin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class kafkaConsumerTest {
@ -45,10 +56,7 @@ public class kafkaConsumerTest {
//一次poll最大拉取消息的条数如果消费者处理速度很快可以设置大点如果处理速度一般可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
/*
pollbrokerconsumer
consumer
*/
//如果两次poll操作间隔超过了这个时间broker就会认为这个consumer处理能力太弱会将其踢出消费组将分区分配给别的consumer消费
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@ -56,32 +64,37 @@ public class kafkaConsumerTest {
}
public static void main(String[] args) throws ParseException {
String TOPIC_NAME = "test";
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(initConfig());
//如果使用自动分区分配,那就不要使用手动分区分配,反之亦然,两者不可同时使用,使用一种方法,就注释掉另一种方法。
//如使用手动分区分配,那么注释掉自动分区分配。
//https://blog.csdn.net/qq_41413743/article/details/123636586
// consumer.subscribe(Arrays.asList(TOPIC_NAME));
//1、配置数据库
PropKit.use("application.properties");
DruidPlugin druid = new DruidPlugin(PropKit.get("jdbcUrl"), PropKit.get("user"),
PropKit.get("password").trim(), PropKit.get("driverClassName"));
druid.start();
// 配置ActiveRecord插件
ActiveRecordPlugin arp = new ActiveRecordPlugin(druid);
arp.start();
//2、读取Kafka
String TOPIC_NAME = "maxwell";
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(initConfig());
// 消费指定分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//消息回溯消费
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//指定offset消费
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/
//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//consumer.seek(new TopicPartition(TOPIC_NAME, 0), 3095);//注意这里的包含3095的
//从指定时间点开始消费
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
// long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
/*
//一般使用offset就可以满足所有需求了不需要使用按时间点
//从某个时间点获取之后的数据
String dateString = "2023-05-10 08:38:00";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ -111,33 +124,38 @@ public class kafkaConsumerTest {
consumer.seek(key, offset);
}
}
*/
while (true) {
/*
* poll() API
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
List<Record> writeList = new ArrayList();
List<String> delList = new ArrayList();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息partition = %d,offset = %d, key = %s, value = %s ,timestamp = %d%n", record.partition(),
record.offset(), record.key(), record.value(), record.timestamp());
//kafka中的offset是一个64位的有符号整数
//64位的有符号整数的最大值9223372036854775807
Record r = new Record();
r.set("offset", record.offset());
r.set("timestamp", record.timestamp());
JSONObject jo = JSONObject.parseObject(record.value());
r.set("database", jo.get("database"));
r.set("table", jo.get("table"));
r.set("type", jo.get("type"));
writeList.add(r);
String sql = "delete from t_sync_log where offset = " + record.offset();
delList.add(sql);
}
if (records.count() > 0) {
// 手动同步提交offset当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync();
// 手动异步提交offset当前线程提交offset不会阻塞可以继续处理后面的程序逻辑
/*consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
System.out.println("将插入数据条数:" + writeList.size());
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
} else {
if (delList.size() > 0) Db.batch(delList, 100);
if (writeList.size() > 0) Db.batchSave("t_sync_log", writeList, 100);
}
});*/
});
}
}
}

@ -0,0 +1,34 @@
/*
Navicat Premium Data Transfer
Source Server : 10.10.14.169
Source Server Type : MySQL
Source Server Version : 100505 (10.5.5-MariaDB-log)
Source Host : 10.10.14.169:22066
Source Schema : ccdjzswd_db
Target Server Type : MySQL
Target Server Version : 100505 (10.5.5-MariaDB-log)
File Encoding : 65001
Date: 10/05/2023 15:21:01
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for t_sync_log
-- ----------------------------
DROP TABLE IF EXISTS `t_sync_log`;
CREATE TABLE `t_sync_log` (
`offset` bigint NOT NULL,
`timestamp` bigint NOT NULL,
`database` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`table` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
PRIMARY KEY (`offset`) USING BTREE,
INDEX `database`(`database` ASC, `table` ASC, `type` ASC) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;

@ -0,0 +1,112 @@
package com.dsideal.FengHuang.Util;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.StandardCharsets;
public class SocketUtil {
private final String host;
private final int port;
public SocketUtil(String host, int port) {
this.host = host;
this.port = port;
}
public void sendHexData(String hexData) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HexDataEncoder());
p.addLast(new HexDataSenderHandler(hexData));
p.addLast(new ClientHandler()); // 添加客户端 Handler
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
//f.channel().closeFuture().sync();
//手动关闭
f.channel().close();
} finally {
workerGroup.shutdownGracefully();
}
}
class HexDataSenderHandler extends ChannelInboundHandlerAdapter {
private final String hexData;
public HexDataSenderHandler(String hexData) {
this.hexData = hexData;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
byte[] bytes = hexStringToByteArray(hexData);
ByteBuf buffer = Unpooled.buffer(bytes.length);
buffer.writeBytes(bytes);
ctx.writeAndFlush(buffer);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
private byte[] hexStringToByteArray(String hexData) {
int len = hexData.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(hexData.charAt(i), 16) << 4) +
Character.digit(hexData.charAt(i + 1), 16));
}
return data;
}
}
static class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 处理服务器响应数据
byte[] bytes = (byte[]) msg;
String response = new String(bytes, StandardCharsets.US_ASCII);
System.out.println("Server response: " + response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
static class HexDataEncoder extends MessageToByteEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
byte[] bytes = new byte[msg.readableBytes()];
msg.getBytes(msg.readerIndex(), bytes);
out.writeBytes(bytes);
}
}
}

@ -1,6 +1,6 @@
{
"title": "党建知识问答排名结果",
"sheetName": "结果",
"title": "长春市教育局学习贯彻习近平新时代中国特色社会主义思想知识答题",
"sheetName": "结果",
"titleHeight": 30,
"rowHeight": 30,
"showNumber": true,
@ -8,37 +8,37 @@
{
"show_column_name": "姓名",
"list_column_name": "person_name",
"width": 40
"width": 20
},
{
"show_column_name": "处室",
"list_column_name": "ks",
"width": 40
"width": 30
},
{
"show_column_name": "电话",
"list_column_name": "tel",
"width": 40
"width": 30
},
{
"show_column_name": "得分",
"list_column_name": "score",
"width": 40
"width": 20
},
{
"show_column_name": "用时",
"list_column_name": "ys",
"width": 40
"width": 20
},
{
"show_column_name": "开始时间",
"list_column_name": "start_time",
"width": 40
"width": 32
},
{
"show_column_name": "交卷时间",
"list_column_name": "end_time",
"width": 40
"width": 32
}
]
}

@ -1,6 +1,6 @@
{
"title": "党建知识问答排名结果",
"sheetName": "结果",
"title": "长春市教育局学习贯彻习近平新时代中国特色社会主义思想知识答题",
"sheetName": "结果",
"titleHeight": 30,
"rowHeight": 30,
"showNumber": true,
@ -8,37 +8,37 @@
{
"show_column_name": "姓名",
"list_column_name": "person_name",
"width": 40
"width": 20
},
{
"show_column_name": "处室",
"list_column_name": "ks",
"width": 40
"width": 30
},
{
"show_column_name": "电话",
"list_column_name": "tel",
"width": 40
"width": 30
},
{
"show_column_name": "得分",
"list_column_name": "score",
"width": 40
"width": 20
},
{
"show_column_name": "用时",
"list_column_name": "ys",
"width": 40
"width": 20
},
{
"show_column_name": "开始时间",
"list_column_name": "start_time",
"width": 40
"width": 32
},
{
"show_column_name": "交卷时间",
"list_column_name": "end_time",
"width": 40
"width": 32
}
]
}
Loading…
Cancel
Save