|
|
package DataexService
|
|
|
|
|
|
import (
|
|
|
"dsDataex/GenXorm/models"
|
|
|
"dsDataex/MyService/DataEX"
|
|
|
"dsDataex/MyService/DataEX/DataexDAO"
|
|
|
"dsDataex/MyService/MySwagger"
|
|
|
"dsDataex/Utils/CacheUtil"
|
|
|
"dsDataex/Utils/ES7Util"
|
|
|
"encoding/json"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
/**
|
|
|
* @Author zhangjun
|
|
|
* @Description 判断接入系统是否可以访问数据源
|
|
|
* @Date 2020-06-16 09:22
|
|
|
* @Param systemID string 接入系统id
|
|
|
* @Param datasource string 数据类型code
|
|
|
* @return bool 成功/失败
|
|
|
* @return string 结果
|
|
|
* @return *models 数据源实体指针
|
|
|
**/
|
|
|
func CheckDatasourceSet(systemID string,datasource string)(bool,string,*models.TDataexDatasource){
|
|
|
|
|
|
success,result,data,_:=DataexDAO.CheckDatasourceSet(systemID,datasource)
|
|
|
|
|
|
if success{
|
|
|
business := new(models.TDataexDatasource)
|
|
|
|
|
|
business.SystemId=systemID
|
|
|
business.DatasourceCode=datasource
|
|
|
business.Id=data["id"].(string)
|
|
|
business.SetFlag,_=strconv.Atoi(data["set_flag"].(string))
|
|
|
business.CollectFlag,_=strconv.Atoi(data["collect_flag"].(string))
|
|
|
business.ProvideType,_=strconv.Atoi(data["provide_type"].(string))
|
|
|
business.ProvideOrgid=data["provide_orgid"].(string)
|
|
|
business.DatastoreType,_=strconv.Atoi(data["datastore_type"].(string))
|
|
|
|
|
|
return true,"数据源类型验证成功",business
|
|
|
}else {
|
|
|
return false,result,nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func CheckDatasourceCollect(systemID string,datasource string)(bool,string){
|
|
|
|
|
|
|
|
|
return true,""
|
|
|
}
|
|
|
|
|
|
func CheckDatasourceGet(systemID string,datasource string)(bool,string,*models.TDataexDataaccess){
|
|
|
success,result,data,_:=DataexDAO.CheckDatasourceGet(systemID,datasource)
|
|
|
|
|
|
if success{
|
|
|
business := new(models.TDataexDataaccess)
|
|
|
|
|
|
business.ConsumeSystemid=systemID
|
|
|
business.DatasourceCode=datasource
|
|
|
business.Id=data["id"].(string)
|
|
|
business.SetFlag,_=strconv.Atoi(data["set_flag"].(string))
|
|
|
business.QueryFlag,_=strconv.Atoi(data["query_flag"].(string))
|
|
|
business.ConsumeOrgid=data["consume_orgid"].(string)
|
|
|
business.ConsumeType,_=strconv.Atoi(data["consume_type"].(string))
|
|
|
|
|
|
return true,result,business
|
|
|
}else {
|
|
|
return false,result,nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* @Author zhangjun
|
|
|
* @Description 判断ES索引是否存在,不存在则创建一个索引
|
|
|
* @Date 2020-06-16 11:50
|
|
|
* @Param
|
|
|
* @return
|
|
|
**/
|
|
|
func CheckCreateIndex(esData *DataEX.ESData) (bool) {
|
|
|
|
|
|
flag,_:= ES7Util.IndexExist(esData.DatasourceId)
|
|
|
//创建 ES 索引
|
|
|
if flag==false {
|
|
|
ES7Util.IndexInit(esData.DatasourceId, esData.DataContent)
|
|
|
}
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* @Author zhangjun
|
|
|
* @Description 增加一个ES文档,测试逻辑删除、单条发送,不使用
|
|
|
* @Date 2020-06-16 16:03
|
|
|
* @Param
|
|
|
* @return
|
|
|
**/
|
|
|
func SetDocument(esData *DataEX.ESData) (bool,string) {
|
|
|
|
|
|
ES7Util.IndexDocDel(esData.DatasourceId,esData.DataId)
|
|
|
|
|
|
success,result,_:=ES7Util.IndexDocAdd(esData.DatasourceId,esData)
|
|
|
|
|
|
return success,result
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* @Author zhangjun
|
|
|
* @Description Set ES索引文档,单条发送,速度较慢,不使用
|
|
|
* @Date 2020-06-17 09:37
|
|
|
* @Param systemId string 接入系统id
|
|
|
* @Param datas 【】data 交换数据数组
|
|
|
* @Param datasource model 数据实体
|
|
|
* @return
|
|
|
**/
|
|
|
func DataexSet(systemID string, datas []MySwagger.Data,datasource *models.TDataexDatasource)(bool,string,[]string,[]MySwagger.FailResult) {
|
|
|
//开始 ES 相关数据操作!!!
|
|
|
if datasource.DatastoreType==1 || datasource.DatastoreType==2 {
|
|
|
|
|
|
var esData DataEX.ESData
|
|
|
|
|
|
esData.SystemId=systemID
|
|
|
esData.DatasourceId=datasource.DatasourceCode
|
|
|
esData.OrgId=datas[0].OrgID
|
|
|
esData.DelFlag=datas[0].DelFlag
|
|
|
esData.EnableFlag=1
|
|
|
esData.DataId=datas[0].DataID
|
|
|
|
|
|
var jsonData map[string]interface{}
|
|
|
json.Unmarshal([]byte(datas[0].Data), &jsonData)
|
|
|
|
|
|
esData.DataContent=jsonData
|
|
|
|
|
|
//判断ES索引是否存在,没有则创建ES索引
|
|
|
CheckCreateIndex(&esData)
|
|
|
|
|
|
//循环删除原索引文档
|
|
|
for no:=0;no< len(datas) && no<100 ;no++ {
|
|
|
|
|
|
esData.DatasourceId=datasource.DatasourceCode
|
|
|
esData.DataId=datas[no].DataID
|
|
|
|
|
|
ES7Util.IndexDocDel(esData.DatasourceId,esData.DataId)
|
|
|
}
|
|
|
|
|
|
var successIDs []string
|
|
|
var failResults []MySwagger.FailResult
|
|
|
|
|
|
//循环添加新的索引文档
|
|
|
for no:=0;no< len(datas) && no<100 ;no++{
|
|
|
var esData2 DataEX.ESData
|
|
|
|
|
|
esData2.SystemId=systemID
|
|
|
esData2.DatasourceId=datasource.DatasourceCode
|
|
|
esData2.EnableFlag=1
|
|
|
esData2.OrgId=datas[no].OrgID
|
|
|
esData2.DelFlag=datas[no].DelFlag
|
|
|
esData2.DataId=datas[no].DataID
|
|
|
|
|
|
var jsonData map[string]interface{}
|
|
|
json.Unmarshal([]byte(datas[no].Data), &jsonData)
|
|
|
|
|
|
esData2.DataContent=jsonData
|
|
|
|
|
|
//success,result:= SetDocument(&esData)
|
|
|
success,result,_:=ES7Util.IndexDocAdd(esData2.DatasourceId,&esData2)
|
|
|
|
|
|
if success{
|
|
|
successIDs=append(successIDs,esData2.DataId)
|
|
|
}else {
|
|
|
var failResult MySwagger.FailResult
|
|
|
|
|
|
failResult.FailID=esData2.DataId
|
|
|
failResult.FailReason=result
|
|
|
|
|
|
failResults=append(failResults,failResult)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return true,"",successIDs,failResults
|
|
|
}
|
|
|
|
|
|
//TODO:后续完善,暂不支持
|
|
|
//开始 Kafka 相关数据操作!!!
|
|
|
if datasource.DatastoreType==3 {
|
|
|
return false,"数据源类型配置不正确",nil,nil
|
|
|
}
|
|
|
|
|
|
return false,"数据源类型配置不正确",nil,nil
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* @Author zhangjun
|
|
|
* @Description Set索引文档,批量发送
|
|
|
* @Date 2020-06-17 09:37
|
|
|
* @Param systemId string 接入系统id
|
|
|
* @Param datas 【】data 交换数据数组
|
|
|
* @Param datasource model 数据实体
|
|
|
* @return bool 成功/失败
|
|
|
* @return string 结果
|
|
|
* @return 【】string 成功id数组
|
|
|
* @return 【】FailResult 失败结果数组
|
|
|
**/
|
|
|
func DataexSetBatch(systemID string, datas []MySwagger.Data,datasource *models.TDataexDatasource)(bool,string,[]string,[]MySwagger.FailResult) {
|
|
|
//add by zhangjun 2020-07-15
|
|
|
//处理组织机构树同步
|
|
|
if datasource.DatasourceCode=="org_school" && systemID=="BASE_GO"{
|
|
|
//change by zhangjun 2020-07-16
|
|
|
//OrgtreeProc(datas)
|
|
|
OrgtreeProcBatch(datas)
|
|
|
}
|
|
|
|
|
|
//开始 ES 相关数据操作!!!
|
|
|
if datasource.DatastoreType==1 || datasource.DatastoreType==2 {
|
|
|
|
|
|
var esData DataEX.ESData
|
|
|
|
|
|
esData.SystemId=systemID
|
|
|
esData.DatasourceId=datasource.DatasourceCode
|
|
|
esData.OrgId=datas[0].OrgID
|
|
|
esData.DelFlag=datas[0].DelFlag
|
|
|
esData.EnableFlag=1
|
|
|
esData.DataId=datas[0].DataID
|
|
|
|
|
|
var jsonData map[string]interface{}
|
|
|
json.Unmarshal([]byte(datas[0].Data), &jsonData)
|
|
|
|
|
|
esData.DataContent=jsonData
|
|
|
|
|
|
//一、判断ES索引是否存在,没有则创建ES索引
|
|
|
CheckCreateIndex(&esData)
|
|
|
|
|
|
//三、删除原索引文档
|
|
|
var dataIDs []string
|
|
|
|
|
|
for no:=0;no< len(datas) && no<1000 ;no++ {
|
|
|
|
|
|
//change by zhangjun 2020-07-15
|
|
|
//dataIDs = append(dataIDs, datas[no].DataID)
|
|
|
dataIDs = append(dataIDs, strings.ToLower(datas[no].DataID))
|
|
|
}
|
|
|
|
|
|
now :=time.Now()
|
|
|
|
|
|
ES7Util.IndexDocDelBatch(esData.DatasourceId,now,dataIDs)
|
|
|
|
|
|
//ES 索引批处理文档数组
|
|
|
var indexData []*DataEX.ESData
|
|
|
//数据校验失败 dataID数组
|
|
|
var failIDs []string
|
|
|
|
|
|
//四、循环添加索引文档
|
|
|
for no:=0;no< len(datas) && no<1000 ;no++{
|
|
|
var esData2 DataEX.ESData
|
|
|
|
|
|
esData2.SystemId=systemID
|
|
|
esData2.DatasourceId=datasource.DatasourceCode
|
|
|
esData2.EnableFlag=1
|
|
|
|
|
|
esData2.BeginTime = DataEX.JsonDate(now)
|
|
|
esData2.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location()))
|
|
|
|
|
|
esData2.DelFlag=datas[no].DelFlag
|
|
|
//change by zhangjun 2020-07-15
|
|
|
//esData2.OrgId=datas[no].OrgID
|
|
|
//esData2.DataId=datas[no].DataID
|
|
|
esData2.OrgId=strings.ToLower(datas[no].OrgID)
|
|
|
esData2.DataId=strings.ToLower(datas[no].DataID)
|
|
|
|
|
|
var jsonData map[string]interface{}
|
|
|
json.Unmarshal([]byte(datas[no].Data), &jsonData)
|
|
|
|
|
|
esData2.DataContent=jsonData
|
|
|
|
|
|
//数据校验:1、机构校验
|
|
|
flag,_,data,_:= DataexDAO.CheckProvideOrgID(datasource.ProvideType,datasource.ProvideOrgid,datas[no].OrgID)
|
|
|
|
|
|
if flag==true{
|
|
|
if data["province_id"] != nil {
|
|
|
esData2.ProvinceId=data["province_id"].(string)
|
|
|
esData2.ProvinceName=CacheUtil.GBT2260[data["province_id"].(string)]
|
|
|
}
|
|
|
if data["city_id"] != nil {
|
|
|
esData2.CityId=data["city_id"].(string)
|
|
|
esData2.CityName=CacheUtil.GBT2260[data["city_id"].(string)]
|
|
|
}
|
|
|
if data["area_id"] != nil {
|
|
|
esData2.AreaId=data["area_id"].(string)
|
|
|
esData2.AreaName=CacheUtil.GBT2260[data["area_id"].(string)]
|
|
|
}
|
|
|
|
|
|
//add by zhangjun 2020-06-29
|
|
|
if data["school_type"] !=nil{
|
|
|
esData2.SchoolType = data["school_type"].(string)
|
|
|
esData2.SchoolTypeName = CacheUtil.XXBXLXM[data["school_type"].(string)]
|
|
|
}
|
|
|
|
|
|
esData2.OrgName=data["org_name"].(string)
|
|
|
|
|
|
esData2.OrgType,_=strconv.Atoi(data["org_type"].(string))
|
|
|
|
|
|
indexData=append(indexData,&esData2)
|
|
|
}else {
|
|
|
failIDs=append(failIDs,datas[no].DataID)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if indexData==nil {
|
|
|
failIDs=nil
|
|
|
|
|
|
return false,"数据机构权限全部不正确",nil,nil
|
|
|
}
|
|
|
|
|
|
success, _, successIDs, failResults := ES7Util.IndexDocAddBatch(esData.DatasourceId, indexData)
|
|
|
//ES7Util.IndexDocDelBatch2(esData.DatasourceId,now,successIDs)
|
|
|
|
|
|
if len(failIDs)>0{
|
|
|
for no:=0;no< len(failIDs);no++{
|
|
|
failResults=append(failResults,MySwagger.FailResult{FailID: failIDs[no],FailReason: "数据机构ID校验不通过,无权操作此机构数据", })
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//五、处理错误数据,恢复删除文档
|
|
|
if len(failResults) > 0 {
|
|
|
var failIDs2 []string
|
|
|
|
|
|
for no:=0;no< len(failResults);no++{
|
|
|
failIDs2=append(failIDs2,failResults[no].FailID)
|
|
|
}
|
|
|
|
|
|
ES7Util.IndexDocDelRestore(esData.DatasourceId,now,failIDs2)
|
|
|
}
|
|
|
|
|
|
if success{
|
|
|
|
|
|
return true,"文档添加成功",successIDs,failResults
|
|
|
}else {
|
|
|
|
|
|
return false,"文档添加失败",successIDs,failResults
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//TODO:后续完善,暂不支持
|
|
|
//开始 Kafka 相关数据操作!!!
|
|
|
if datasource.DatastoreType==3 {
|
|
|
return false,"数据源类型配置不正确",nil,nil
|
|
|
}
|
|
|
|
|
|
return false,"数据源类型配置不正确",nil,nil
|
|
|
}
|
|
|
|
|
|
func DataexGet(datasourceCode string,consumeType int,consumeOrgID string,orgID string,dataID string)(bool,string,[]MySwagger.Data){
|
|
|
|
|
|
flag,result,_:=DataexDAO.CheckConsumeOrgID(consumeType,consumeOrgID,orgID)
|
|
|
|
|
|
if flag{
|
|
|
|
|
|
var lstData []MySwagger.Data
|
|
|
var data MySwagger.Data
|
|
|
|
|
|
flag2,result2,esdata:= ES7Util.IndexDocGet(datasourceCode,dataID)
|
|
|
|
|
|
if flag2{
|
|
|
data.DataID=esdata.DataId
|
|
|
data.OrgID=esdata.OrgId
|
|
|
data.DelFlag=esdata.DelFlag
|
|
|
|
|
|
bytes,_:=json.Marshal(esdata.DataContent)
|
|
|
data.Data=string(bytes)
|
|
|
|
|
|
lstData=append(lstData,data)
|
|
|
|
|
|
flag,result,_=DataexDAO.CheckConsumeOrgID(consumeType,consumeOrgID,data.OrgID)
|
|
|
|
|
|
if flag{
|
|
|
return true,"获取数据成功",lstData
|
|
|
}else {
|
|
|
return false,result,nil
|
|
|
}
|
|
|
}else {
|
|
|
|
|
|
return false,result2,nil
|
|
|
}
|
|
|
}else {
|
|
|
|
|
|
return flag,result,nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func DataexPage(datasourceCode string,consumeType int,consumeOrgID string,orgID string,pageNO int,beginTime string)(bool,string,[]MySwagger.Data) {
|
|
|
|
|
|
flag, result, _ := DataexDAO.CheckConsumeOrgID(consumeType, consumeOrgID, orgID)
|
|
|
|
|
|
if flag{
|
|
|
var lstEsdata []DataEX.ESData
|
|
|
|
|
|
if orgID=="-1"{
|
|
|
|
|
|
flag,result,lstEsdata= ES7Util.IndexDocPage(datasourceCode,nil,pageNO,beginTime)
|
|
|
}else {
|
|
|
orgIDs:=DataexDAO.GetDatasourceOrgIDS(consumeType,orgID)
|
|
|
|
|
|
flag,result,lstEsdata= ES7Util.IndexDocPage(datasourceCode,orgIDs,pageNO,beginTime)
|
|
|
}
|
|
|
|
|
|
if flag{
|
|
|
var lstData []MySwagger.Data
|
|
|
var data MySwagger.Data
|
|
|
|
|
|
for no:=0;no< len(lstEsdata);no++{
|
|
|
data.DataID=lstEsdata[no].DataId;
|
|
|
data.OrgID=lstEsdata[no].OrgId
|
|
|
data.DelFlag=lstEsdata[no].DelFlag
|
|
|
|
|
|
bytes,_:=json.Marshal(lstEsdata[no].DataContent)
|
|
|
data.Data=string(bytes)
|
|
|
|
|
|
lstData=append(lstData,data)
|
|
|
}
|
|
|
|
|
|
return true,"获取数据成功",lstData
|
|
|
}else {
|
|
|
return false,result,nil
|
|
|
}
|
|
|
return flag,result,nil
|
|
|
}else {
|
|
|
return false,result,nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func DataexQuery(datasourceCode string,consumeType int,consumeOrgID string,orgID string,pageNO int,conditions []string)(bool,string,[]MySwagger.Data) {
|
|
|
|
|
|
flag, result, _ := DataexDAO.CheckConsumeOrgID(consumeType, consumeOrgID, orgID)
|
|
|
|
|
|
if flag{
|
|
|
var lstEsdata []DataEX.ESData
|
|
|
|
|
|
if orgID=="-1"{
|
|
|
|
|
|
flag,result,lstEsdata= ES7Util.IndexDocQuery(datasourceCode,nil,pageNO,conditions)
|
|
|
}else {
|
|
|
orgIDs:=DataexDAO.GetDatasourceOrgIDS(consumeType,orgID)
|
|
|
|
|
|
flag,result,lstEsdata= ES7Util.IndexDocQuery(datasourceCode,orgIDs,pageNO,conditions)
|
|
|
}
|
|
|
|
|
|
if flag{
|
|
|
var lstData []MySwagger.Data
|
|
|
var data MySwagger.Data
|
|
|
|
|
|
for no:=0;no< len(lstEsdata);no++{
|
|
|
data.DataID=lstEsdata[no].DataId;
|
|
|
data.OrgID=lstEsdata[no].OrgId
|
|
|
data.DelFlag=lstEsdata[no].DelFlag
|
|
|
|
|
|
bytes,_:=json.Marshal(lstEsdata[no].DataContent)
|
|
|
data.Data=string(bytes)
|
|
|
|
|
|
lstData=append(lstData,data)
|
|
|
}
|
|
|
|
|
|
return true,"获取数据成功",lstData
|
|
|
}else {
|
|
|
return false,result,nil
|
|
|
}
|
|
|
return flag,result,nil
|
|
|
}else {
|
|
|
return false,result,nil
|
|
|
}
|
|
|
} |