main
HuangHai 2 months ago
parent da4761f412
commit 6415820c1d

@ -1,70 +1,55 @@
package com.dsideal.Res.Milvus.Controller;
import com.dsideal.Config.PropKit;
import com.dsideal.Res.Plugin.MilvusPlugin;
import com.jfinal.core.Controller;
import com.jfinal.kit.Ret;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.QueryResults;
import io.milvus.param.R;
import io.milvus.param.collection.LoadCollectionParam;
import io.milvus.param.dml.QueryParam;
import io.milvus.response.QueryResultsWrapper;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.collection.request.LoadCollectionReq;
import io.milvus.v2.service.vector.request.QueryReq;
import io.milvus.v2.service.vector.response.QueryResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class MilvusDemoController extends Controller {
private final MilvusPlugin milvusPlugin = MilvusPlugin.getInstance();
private static final Logger logger = LoggerFactory.getLogger(MilvusDemoController.class);
public void index() {
MilvusClientV2 client = null;
try {
MilvusServiceClient client = milvusPlugin.getClient();
client = MilvusPlugin.getInstance().getClient();
String collectionName = PropKit.get("milvus.ms_collection_name");
// 加载集合
client.loadCollection(
LoadCollectionParam.newBuilder()
.withCollectionName(collectionName)
.build()
LoadCollectionReq.builder()
.collectionName(collectionName)
.build()
);
// 查询所有数据
QueryParam queryParam = QueryParam.newBuilder()
.withCollectionName(collectionName)
.withOutFields(Arrays.asList("id", "person_id", "user_input", "model_response", "timestamp", "embedding"))
.withExpr("") // 空表达式查询所有数据
.withLimit(1000L) // 设置最大返回记录数
.build();
R<QueryResults> response = client.query(queryParam);
if (response.getStatus() != R.Status.Success.getCode()) {
throw new RuntimeException("Milvus query failed: " + response.getMessage());
}
// 构建V2版本查询请求
QueryReq queryReq = QueryReq.builder()
.collectionName(collectionName)
.outputFields(Arrays.asList("id", "person_id", "user_input",
"model_response", "timestamp", "embedding"))
.filter("id >= 0")
.limit(1000L)
.build();
List<Map<String, Object>> results = new ArrayList<>();
QueryResultsWrapper wrapper = new QueryResultsWrapper(response.getData());
// 执行查询
QueryResp response = client.query(queryReq);
// 处理查询结果
List<QueryResultsWrapper.RowRecord> records = wrapper.getRowRecords();
for (QueryResultsWrapper.RowRecord record : records) {
Map<String, Object> row = new HashMap<>();
row.put("id", record.get("id"));
row.put("person_id", record.get("person_id"));
row.put("user_input", record.get("user_input"));
row.put("model_response", record.get("model_response"));
row.put("timestamp", record.get("timestamp"));
row.put("embedding", record.get("embedding"));
results.add(row);
}
// 返回查询结果
renderJson(Ret.ok("data", results));
// 处理结果...
} catch (Exception e) {
logger.error("Milvus查询失败", e);
renderJson(Ret.fail("msg", "查询失败:" + e.getMessage()));
logger.error("Milvus操作异常", e);
renderJson(Ret.fail("msg", "操作失败: " + e.getMessage()));
} finally {
if (client != null) {
MilvusPlugin.getInstance().returnClient(client);
}
}
}
}
}

@ -1,20 +1,22 @@
package com.dsideal.Res.Plugin;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.pool.PoolConfig;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
@Getter
public class MilvusPlugin {
private static MilvusPlugin instance;
private MilvusServiceClient client;
private static final Logger logger = LoggerFactory.getLogger(MilvusPlugin.class);
private MilvusPlugin() {}
private MilvusClientV2Pool pool;
private MilvusPlugin() {
}
public static MilvusPlugin getInstance() {
if (instance == null) {
@ -25,26 +27,44 @@ public class MilvusPlugin {
public void init(String host, int port, int maxConnections) {
try {
ConnectParam connectParam = ConnectParam.newBuilder()
.withHost(host)
.withPort(port)
.withConnectTimeout(10, TimeUnit.SECONDS)
.withKeepAliveTime(55, TimeUnit.SECONDS)
.withKeepAliveTimeout(20, TimeUnit.SECONDS)
.withIdleTimeout(180, TimeUnit.SECONDS)
ConnectConfig connectConfig = ConnectConfig.builder()
.uri("http://" + host + ":" + port)
//.token("root:Milvus") // replace this with your token
.build();
PoolConfig poolConfig = PoolConfig.builder()
.maxIdlePerKey(10) // max idle clients per key
.maxTotalPerKey(20) // max total(idle + active) clients per key
.maxTotal(100) // max total clients for all keys
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
.minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds
.build();
client = new MilvusServiceClient(connectParam);
logger.info("Milvus连接成功最大连接数{}", maxConnections);
pool = new MilvusClientV2Pool(poolConfig, connectConfig);
logger.info("Milvus连接池初始化成功最大连接数{}", maxConnections);
} catch (Exception e) {
logger.error("Milvus初始化失败: " + e.getMessage(), e);
throw new RuntimeException("Milvus初始化失败", e);
}
}
public void close() {
public MilvusClientV2 getClient() {
try {
return pool.getClient("client_name");
} catch (Exception e) {
logger.error("获取Milvus连接失败: " + e.getMessage(), e);
throw new RuntimeException("获取Milvus连接失败", e);
}
}
public void returnClient(MilvusClientV2 client) {
if (client != null) {
client.close();
pool.returnClient("client_name",client);
}
}
public void close() {
if (pool != null) {
pool.close();
}
}
}
Loading…
Cancel
Save