From cc6c8b425d824f5398ed7e04a58aa97229a48234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Wed, 31 Jul 2024 14:20:38 +0800 Subject: [PATCH] 'commit' --- Ylt/ms-task/pom.xml | 6 + .../java/com/charge/Test/TestRocketMQ.java | 142 ++++++++++++++++++ Ylt/ms-task/src/test/java/TaskTest2.java | 89 +++++++++++ 3 files changed, 237 insertions(+) create mode 100644 Ylt/ms-task/src/main/java/com/charge/Test/TestRocketMQ.java create mode 100644 Ylt/ms-task/src/test/java/TaskTest2.java diff --git a/Ylt/ms-task/pom.xml b/Ylt/ms-task/pom.xml index ba1fe27..43c4700 100644 --- a/Ylt/ms-task/pom.xml +++ b/Ylt/ms-task/pom.xml @@ -123,6 +123,12 @@ + + + org.apache.rocketmq + rocketmq-client + 4.8.0 + com.aliyun.openservices diff --git a/Ylt/ms-task/src/main/java/com/charge/Test/TestRocketMQ.java b/Ylt/ms-task/src/main/java/com/charge/Test/TestRocketMQ.java new file mode 100644 index 0000000..b6b1ad9 --- /dev/null +++ b/Ylt/ms-task/src/main/java/com/charge/Test/TestRocketMQ.java @@ -0,0 +1,142 @@ +package com.charge.Test; + +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +import java.io.IOException; +import java.util.List; + +//Java整合RocketMQ实现生产消费 +//https://blog.csdn.net/qq_28314431/article/details/128452607 + +public class TestRocketMQ { + private final static String nameServer = "10.10.14.210:9876"; + + private final static String producerGroup = "my_group"; + private final static String consumerGroup = "my_group"; + private final static String topic = "topic-test"; + + //同步发送 + public static void syncSend() { + try { + // 初始化一个producer并设置Producer group name + DefaultMQProducer producer = new DefaultMQProducer(producerGroup); + // 设置NameServer地址 + producer.setNamesrvAddr(nameServer); + // 启动producer + producer.start(); + // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤 + Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); + // 利用producer进行发送,并同步等待发送结果 + SendResult sendResult = producer.send(msg, 10000); + System.out.printf("%s%n", sendResult); + // 一旦producer不再使用,关闭producer + producer.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + //异步发送 + public static void asyncSend() throws IOException { + try { + // 初始化一个producer并设置Producer group name + DefaultMQProducer producer = new DefaultMQProducer(producerGroup); + // 设置NameServer地址 + producer.setNamesrvAddr(nameServer); + // 启动producer + producer.start(); + // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤 + Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); + // 异步发送消息, 发送结果通过callback返回给客户端 + producer.send(msg, new SendCallback() { + public void onSuccess(SendResult sendResult) { + System.out.printf("OK %s %n", + sendResult.getMsgId()); + } + + public void onException(Throwable e) { + System.out.printf("Exception %s %n", e); + e.printStackTrace(); + } + }, 10000); + } catch (Exception e) { + e.printStackTrace(); + } + System.in.read(); + } + + //顺序消息 + public static void orderSend() { + try { + // 初始化一个producer并设置Producer group name + DefaultMQProducer producer = new DefaultMQProducer(producerGroup); + // 设置NameServer地址 + producer.setNamesrvAddr(nameServer); + producer.setSendMsgTimeout(10000); + // 启动producer + producer.start(); + String[] tags = new String[]{"TagA", "TagB", "TagC"}; + for (int i = 0; i < 10; i++) { + int orderId = i % 10; + Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg, new MessageQueueSelector() { + public MessageQueue select(List mqs, Message msg, Object arg) { + Integer id = (Integer) arg; + int index = id % mqs.size(); + return mqs.get(index); + } + }, orderId); + System.out.printf("%s%n", sendResult); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + + public static void consumerPull() { + try { + DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup); + consumer.setNamesrvAddr(nameServer); + //关闭自动提交 + consumer.setAutoCommit(false); + consumer.subscribe(topic, "*"); + consumer.setPullBatchSize(20); + consumer.start(); + while (true) { + List messageExts = consumer.poll(); + if (messageExts.size() > 0) { + String str = new String(messageExts.get(0).getBody()); + System.out.println(str); + } else { + System.out.println("本轮没有找到需要消费的消息!"); + } + consumer.commitSync(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) throws IOException { + //同步发送 + syncSend(); + + //异步发送 + //asyncSend(); + + //顺序消息 + //orderSend(); + + //拉消费 + consumerPull(); + } +} diff --git a/Ylt/ms-task/src/test/java/TaskTest2.java b/Ylt/ms-task/src/test/java/TaskTest2.java new file mode 100644 index 0000000..33dd30b --- /dev/null +++ b/Ylt/ms-task/src/test/java/TaskTest2.java @@ -0,0 +1,89 @@ + +import com.charge.task.TaskApplication; +import com.charge.task.service.CompanyCarChargeReportService; +import com.charge.task.service.CompanyChargeDailyStatisticsGenerateService; +import com.charge.task.service.TaskService; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; +import java.util.stream.Stream; + + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = TaskApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class TaskTest2 { + + @Autowired + TaskService taskService; + + @Resource + private CompanyChargeDailyStatisticsGenerateService companyChargeDailyStatisticsGenerateService; + + @Autowired + CompanyCarChargeReportService companyCarChargeReportService; + + + @Test + public void test01(){ + companyCarChargeReportService.insertCompanyCarChargeReport("2022-10-17"); + } + + + @Test + public void test(){ +// PileOrderMap pileOrderInfo = pileOrderMapDao.findOne("2020033123472571578"); +// String timeShare = timeShareComputeStr(pileOrderInfo.getMultiChargeDegree()); +// String chargeBeginTime = ToolDateTime.dateToDateString(pileOrderInfo.getChargeBeginTime()); +// String chargeEndTime = ToolDateTime.dateToDateString(pileOrderInfo.getChargeEndTime()); +// +// EquipmentChargeOrderDO orderInfo = new EquipmentChargeOrderDO(); +// orderInfo.setChargeTimesDegree(timeShare); +// orderInfo.setChargeBeginTime(pileOrderInfo.getChargeBeginTime()); +// orderInfo.setChargeEndTime(pileOrderInfo.getChargeEndTime()); +// orderInfo.setChargeDuration(pileOrderInfo.getChargeDuration()); +// orderInfo.setChargeBeginDegree(pileOrderInfo.getChargeBeginDegree()); +// orderInfo.setChargeEndDegree(pileOrderInfo.getChargeEndDegree()); +// orderInfo.setChargeDegree(pileOrderInfo.getChargeDegree()); +// orderInfo.setChargeBeginSoc(Float.valueOf(String.valueOf(pileOrderInfo.getChargeBeginSoc()))); +// orderInfo.setChargeEndSoc(Float.valueOf(String.valueOf(pileOrderInfo.getChargeEndSoc()))); +// orderInfo.setChargeCurSoc(Float.valueOf(String.valueOf(pileOrderInfo.getChargeEndSoc()))); +// orderInfo.setChargeStrategy(pileOrderInfo.getChargeStrategy()); +// orderInfo.setChargeStrategyParam(pileOrderInfo.getChargeStrategyParam()); +// orderInfo.setBootMode(pileOrderInfo.getBootMode()); +// orderInfo.setChargeVin(pileOrderInfo.getChargeVin()); +// orderInfo.setChargeAh((float)pileOrderInfo.getChargeDegree()*100); +// orderInfo.setFinishCode(44); +// orderInfo.setFinishType(2); +// orderInfo.setFinishMsg("补单结束"); +// orderInfo.setFinishTime(ToolDateTime.getDate()); +// +// System.out.println(chargeBeginTime); +// System.out.println(chargeEndTime); +// System.out.println(timeShare); + } + + + + private String timeShareComputeStr(double[] chargeTimesDegree){ + StringBuilder chargeTimesDegreeStr = new StringBuilder(); + Stream.iterate(0, i->i+1).limit(chargeTimesDegree.length).forEach(i->{ + chargeTimesDegreeStr.append(chargeTimesDegree[i]); + if (i < chargeTimesDegree.length - 1) { + chargeTimesDegreeStr.append(","); + } + }); + return chargeTimesDegreeStr.toString(); + } + + @Test + public void companyChargeDailyStatisticsGenerateServiceTest() { + for (int i = 20; i < 31; i++) { + companyChargeDailyStatisticsGenerateService.generateByDateTransaction("2022-09-" + String.format("%02d", i)); + } + } + +}