main
HuangHai 1 month ago
parent fe90744717
commit 757d47fc39

File diff suppressed because one or more lines are too long

@ -16,8 +16,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!--JFinal版本号--> <!--JFinal版本号-->
<jfinal.version>5.2.3</jfinal.version> <jfinal.version>5.2.4</jfinal.version>
<jfinal-undertow.version>3.6</jfinal-undertow.version> <jfinal-undertow.version>3.7</jfinal-undertow.version>
<jfinal-cos.version>2022.2</jfinal-cos.version> <jfinal-cos.version>2022.2</jfinal-cos.version>
<!--HikariCP版本号--> <!--HikariCP版本号-->
<HikariCP.version>5.1.0</HikariCP.version> <HikariCP.version>5.1.0</HikariCP.version>

@ -7,6 +7,7 @@ import com.jfinal.aop.Before;
import com.jfinal.core.Controller; import com.jfinal.core.Controller;
import com.jfinal.ext.interceptor.GET; import com.jfinal.ext.interceptor.GET;
import com.jfinal.kit.PathKit; import com.jfinal.kit.PathKit;
import com.jfinal.kit.SseEmitter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
@ -26,125 +27,51 @@ public class YunNanAiController extends Controller {
* http://10.10.21.20:9200/dsBase/ai/compareShiZhou?shiZhouA=文山州&shiZhouB=楚雄州 * http://10.10.21.20:9200/dsBase/ai/compareShiZhou?shiZhouA=文山州&shiZhouB=楚雄州
*/ */
@Before({GET.class}) @Before({GET.class})
public void compareShiZhou(String shiZhouA, String shiZhouB) { public void compareShiZhou(String shiZhouA, String shiZhouB) throws InterruptedException {
// 设置SSE响应头 //创建sse
getResponse().setContentType("text/event-stream"); SseEmitter sseEmitter = new SseEmitter(getResponse());
getResponse().setCharacterEncoding("UTF-8"); //主线程里拿到数据推送消息
getResponse().setHeader("Cache-Control", "no-cache"); String msg = "111 222 333 444 555 666 777";
getResponse().setHeader("Connection", "keep-alive"); String[] datas = msg.split(" ");
getResponse().setHeader("Access-Control-Allow-Origin", "*"); for (String data : datas) {
getResponse().setHeader("Access-Control-Allow-Headers", "Cache-Control"); //推送
sseEmitter.sendMessage(data);
try { Thread.sleep(50);
PrintWriter writer = getResponse().getWriter(); }
//sse完成
String[] regions = {shiZhouA, shiZhouB}; sseEmitter.complete();
// 第一步:数据获取 //结束response
String dataContent = ym.collectEducationData(regions); renderNull();
String analysisPrompt = ym.createAnalysisPrompt(dataContent, regions); }
// 生成输出文件名相关信息
String timestamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date());
String regionName = String.join("与", regions);
String uploadPath = PathKit.getWebRootPath() + "/upload";
// 发送开始事件
writer.write("data: {\"type\":\"start\",\"message\":\"开始分析数据...\"}\n\n");
writer.flush();
// 第二步AI分析并生成Word文档
CallDeepSeek.callDeepSeekStream(analysisPrompt, new CallDeepSeek.SSEListener() {
private StringBuilder fullResponse = new StringBuilder();
@Override
public void onData(String data) {
try {
fullResponse.append(data);
// 发送分析数据
String escapedData = data.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", "\\r");
writer.write("data: {\"type\":\"analysis\",\"content\":\"" + escapedData + "\"}\n\n");
writer.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onComplete(String response) {
try {
// 发送分析完成事件
writer.write("data: {\"type\":\"analysis_complete\",\"message\":\"分析完成,开始生成文档...\"}\n\n");
writer.flush();
// 使用完整的分析结果
String analysisResult = fullResponse.toString();
// 生成Word文档
String wordOutputPath = uploadPath + "/" + regionName + "_教育分析报告_" + timestamp + ".docx";
String wordFilePath = WordGenerator.generateWordDocument(analysisResult, wordOutputPath, regions);
// 构建返回的文件信息
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "报告生成完成");
result.put("wordFile", "/upload/" + regionName + "_教育分析报告_" + timestamp + ".docx");
result.put("wordFilePath", wordFilePath);
result.put("timestamp", timestamp);
result.put("regions", regions);
// 发送完成事件和文件地址
String resultJson = com.jfinal.kit.JsonKit.toJson(result);
writer.write("data: {\"type\":\"complete\",\"result\":" + resultJson + "}\n\n");
writer.flush();
// 发送结束标记
writer.write("data: [DONE]\n\n");
writer.flush();
// 最后使用renderJson返回结果用于非SSE客户端
renderJson(result);
} catch (Exception e) {
try {
Map<String, Object> errorResult = new HashMap<>();
errorResult.put("success", false);
errorResult.put("message", "生成文档时出错: " + e.getMessage());
String errorJson = com.jfinal.kit.JsonKit.toJson(errorResult);
writer.write("data: {\"type\":\"error\",\"result\":" + errorJson + "}\n\n");
writer.flush();
renderJson(errorResult);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
@Override /**
public void onError(String error) { * 线
*
* http://10.10.21.20:9200/dsBase/ai/sse2
*/
public void sse2(){
//创建sse
final SseEmitter sseEmitter = new SseEmitter(getResponse());
String msg = "111 222 333 444 555 666 777";
//模拟chatgptapi 异步调用返回
new Thread(new Runnable() {
public void run() {
String[] datas = msg.split("");
for(String data:datas){
//推送 异步子线程里推送数据给客户端
sseEmitter.sendMessage(data);
try { try {
Map<String, Object> errorResult = new HashMap<>(); Thread.sleep(50);
errorResult.put("success", false); } catch (InterruptedException e) {
errorResult.put("message", "DeepSeek分析出错: " + error); throw new RuntimeException(e);
String errorJson = com.jfinal.kit.JsonKit.toJson(errorResult);
writer.write("data: {\"type\":\"error\",\"result\":" + errorJson + "}\n\n");
writer.flush();
renderJson(errorResult);
} catch (Exception e) {
e.printStackTrace();
} }
} }
}); //完成sse 告知主线程已经完成
sseEmitter.complete();
} catch (IOException e) { }
// 如果SSE失败降级为普通JSON响应 }).start();
Map<String, Object> errorResult = new HashMap<>(); //锁住等待子线程推送完成
errorResult.put("success", false); sseEmitter.waiting(100);
errorResult.put("message", "SSE连接失败: " + e.getMessage()); renderNull();
renderJson(errorResult);
}
} }
} }

@ -39,14 +39,20 @@ public class CallDeepSeek {
new Thread(() -> { new Thread(() -> {
StringBuilder fullResponse = new StringBuilder(); StringBuilder fullResponse = new StringBuilder();
try { try {
System.out.println("开始调用DeepSeek API...");
JSONObject jsonPayload = createRequestPayload(prompt); JSONObject jsonPayload = createRequestPayload(prompt);
System.out.println("请求载荷: " + jsonPayload.toString());
HttpClient client = createHttpClient(); HttpClient client = createHttpClient();
java.net.http.HttpRequest request = createHttpRequest(jsonPayload); java.net.http.HttpRequest request = createHttpRequest(jsonPayload);
System.out.println("发送HTTP请求到: " + API_URL);
CompletableFuture<Void> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofLines()) CompletableFuture<Void> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
.thenAccept(response -> { .thenAccept(response -> {
System.out.println("收到响应,状态码: " + response.statusCode());
handleStreamResponse(response, fullResponse, listener, outputPath, saveToFile); handleStreamResponse(response, fullResponse, listener, outputPath, saveToFile);
}).exceptionally(e -> { }).exceptionally(e -> {
System.err.println("请求异常: " + e.getMessage());
listener.onError("请求或处理异常: " + e.getMessage()); listener.onError("请求或处理异常: " + e.getMessage());
e.printStackTrace(); e.printStackTrace();
return null; return null;
@ -54,6 +60,7 @@ public class CallDeepSeek {
future.join(); future.join();
} catch (Exception e) { } catch (Exception e) {
System.err.println("发生意外错误: " + e.getMessage());
listener.onError("发生意外错误: " + e.getMessage()); listener.onError("发生意外错误: " + e.getMessage());
e.printStackTrace(); e.printStackTrace();
} }
@ -107,10 +114,14 @@ public class CallDeepSeek {
SSEListener listener, SSEListener listener,
String outputPath, String outputPath,
boolean saveToFile) { boolean saveToFile) {
System.out.println("处理流式响应,状态码: " + response.statusCode());
if (response.statusCode() == 200) { if (response.statusCode() == 200) {
System.out.println("开始处理SSE数据流...");
response.body().forEach(line -> { response.body().forEach(line -> {
System.out.println("收到SSE行: " + line);
if (line.startsWith("data:")) { if (line.startsWith("data:")) {
String data = line.substring(5).trim(); String data = line.substring(5).trim();
System.out.println("解析SSE数据: " + data);
if (!data.equals("[DONE]")) { if (!data.equals("[DONE]")) {
try { try {
JSONObject jsonData = JSONUtil.parseObj(data); JSONObject jsonData = JSONUtil.parseObj(data);
@ -120,19 +131,24 @@ public class CallDeepSeek {
.getJSONObject("delta") .getJSONObject("delta")
.getStr("content", ""); .getStr("content", "");
if (content != null && !content.isEmpty()) { if (content != null && !content.isEmpty()) {
System.out.println("提取到内容: " + content);
fullResponse.append(content); fullResponse.append(content);
listener.onData(content); listener.onData(content);
} }
} }
} catch (Exception e) { } catch (Exception e) {
System.err.println("解析SSE JSON数据错误: " + data + " \nError: " + e.getMessage()); System.err.println("解析SSE JSON数据错误: " + data + " \nError: " + e.getMessage());
e.printStackTrace();
} }
} else {
System.out.println("收到结束标记 [DONE]");
} }
} }
}); });
// 流结束后的处理 // 流结束后的处理
String responseContent = fullResponse.toString(); String responseContent = fullResponse.toString();
System.out.println("流处理完成,总内容长度: " + responseContent.length());
if (saveToFile && outputPath != null) { if (saveToFile && outputPath != null) {
FileUtil.writeString(responseContent, new File(outputPath), "UTF-8"); FileUtil.writeString(responseContent, new File(outputPath), "UTF-8");
listener.onComplete("内容已成功保存到" + outputPath); listener.onComplete("内容已成功保存到" + outputPath);
@ -140,6 +156,7 @@ public class CallDeepSeek {
listener.onComplete(responseContent); listener.onComplete(responseContent);
} }
} else { } else {
System.err.println("API请求失败状态码: " + response.statusCode());
listener.onError("API请求失败: " + response.statusCode() + " Body: " + response.body().toString()); listener.onError("API请求失败: " + response.statusCode() + " Body: " + response.body().toString());
} }
} }

Loading…
Cancel
Save