main
黄海 12 months ago
parent 995fe69606
commit cc6c8b425d

@ -123,6 +123,12 @@
</exclusion></exclusions>
</dependency>
<!--黄海为了测试本地的RocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<!--消息队列 RocketMQ-->
<dependency>
<groupId>com.aliyun.openservices</groupId>

@ -0,0 +1,142 @@
package com.charge.Test;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.IOException;
import java.util.List;
//Java整合RocketMQ实现生产消费
//https://blog.csdn.net/qq_28314431/article/details/128452607
public class TestRocketMQ {
private final static String nameServer = "10.10.14.210:9876";
private final static String producerGroup = "my_group";
private final static String consumerGroup = "my_group";
private final static String topic = "topic-test";
//同步发送
public static void syncSend() {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
// 创建一条消息并指定topic、tag、body等信息tag可以理解成标签对消息进行再归类RocketMQ可以在消费端对tag进行过滤
Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 利用producer进行发送并同步等待发送结果
SendResult sendResult = producer.send(msg, 10000);
System.out.printf("%s%n", sendResult);
// 一旦producer不再使用关闭producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
//异步发送
public static void asyncSend() throws IOException {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
// 启动producer
producer.start();
// 创建一条消息并指定topic、tag、body等信息tag可以理解成标签对消息进行再归类RocketMQ可以在消费端对tag进行过滤
Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息, 发送结果通过callback返回给客户端
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.printf("OK %s %n",
sendResult.getMsgId());
}
public void onException(Throwable e) {
System.out.printf("Exception %s %n", e);
e.printStackTrace();
}
}, 10000);
} catch (Exception e) {
e.printStackTrace();
}
System.in.read();
}
//顺序消息
public static void orderSend() {
try {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 设置NameServer地址
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(10000);
// 启动producer
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
int orderId = i % 10;
Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void consumerPull() {
try {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
//关闭自动提交
consumer.setAutoCommit(false);
consumer.subscribe(topic, "*");
consumer.setPullBatchSize(20);
consumer.start();
while (true) {
List<MessageExt> messageExts = consumer.poll();
if (messageExts.size() > 0) {
String str = new String(messageExts.get(0).getBody());
System.out.println(str);
} else {
System.out.println("本轮没有找到需要消费的消息!");
}
consumer.commitSync();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
//同步发送
syncSend();
//异步发送
//asyncSend();
//顺序消息
//orderSend();
//拉消费
consumerPull();
}
}

@ -0,0 +1,89 @@
import com.charge.task.TaskApplication;
import com.charge.task.service.CompanyCarChargeReportService;
import com.charge.task.service.CompanyChargeDailyStatisticsGenerateService;
import com.charge.task.service.TaskService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.stream.Stream;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TaskApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class TaskTest2 {
@Autowired
TaskService taskService;
@Resource
private CompanyChargeDailyStatisticsGenerateService companyChargeDailyStatisticsGenerateService;
@Autowired
CompanyCarChargeReportService companyCarChargeReportService;
@Test
public void test01(){
companyCarChargeReportService.insertCompanyCarChargeReport("2022-10-17");
}
@Test
public void test(){
// PileOrderMap pileOrderInfo = pileOrderMapDao.findOne("2020033123472571578");
// String timeShare = timeShareComputeStr(pileOrderInfo.getMultiChargeDegree());
// String chargeBeginTime = ToolDateTime.dateToDateString(pileOrderInfo.getChargeBeginTime());
// String chargeEndTime = ToolDateTime.dateToDateString(pileOrderInfo.getChargeEndTime());
//
// EquipmentChargeOrderDO orderInfo = new EquipmentChargeOrderDO();
// orderInfo.setChargeTimesDegree(timeShare);
// orderInfo.setChargeBeginTime(pileOrderInfo.getChargeBeginTime());
// orderInfo.setChargeEndTime(pileOrderInfo.getChargeEndTime());
// orderInfo.setChargeDuration(pileOrderInfo.getChargeDuration());
// orderInfo.setChargeBeginDegree(pileOrderInfo.getChargeBeginDegree());
// orderInfo.setChargeEndDegree(pileOrderInfo.getChargeEndDegree());
// orderInfo.setChargeDegree(pileOrderInfo.getChargeDegree());
// orderInfo.setChargeBeginSoc(Float.valueOf(String.valueOf(pileOrderInfo.getChargeBeginSoc())));
// orderInfo.setChargeEndSoc(Float.valueOf(String.valueOf(pileOrderInfo.getChargeEndSoc())));
// orderInfo.setChargeCurSoc(Float.valueOf(String.valueOf(pileOrderInfo.getChargeEndSoc())));
// orderInfo.setChargeStrategy(pileOrderInfo.getChargeStrategy());
// orderInfo.setChargeStrategyParam(pileOrderInfo.getChargeStrategyParam());
// orderInfo.setBootMode(pileOrderInfo.getBootMode());
// orderInfo.setChargeVin(pileOrderInfo.getChargeVin());
// orderInfo.setChargeAh((float)pileOrderInfo.getChargeDegree()*100);
// orderInfo.setFinishCode(44);
// orderInfo.setFinishType(2);
// orderInfo.setFinishMsg("补单结束");
// orderInfo.setFinishTime(ToolDateTime.getDate());
//
// System.out.println(chargeBeginTime);
// System.out.println(chargeEndTime);
// System.out.println(timeShare);
}
private String timeShareComputeStr(double[] chargeTimesDegree){
StringBuilder chargeTimesDegreeStr = new StringBuilder();
Stream.iterate(0, i->i+1).limit(chargeTimesDegree.length).forEach(i->{
chargeTimesDegreeStr.append(chargeTimesDegree[i]);
if (i < chargeTimesDegree.length - 1) {
chargeTimesDegreeStr.append(",");
}
});
return chargeTimesDegreeStr.toString();
}
@Test
public void companyChargeDailyStatisticsGenerateServiceTest() {
for (int i = 20; i < 31; i++) {
companyChargeDailyStatisticsGenerateService.generateByDateTransaction("2022-09-" + String.format("%02d", i));
}
}
}
Loading…
Cancel
Save