package DataexService import ( "dsDataex/GenXorm/models" "dsDataex/MyService/DataEX" "dsDataex/MyService/DataEX/DataexDAO" "dsDataex/MyService/MySwagger" "dsDataex/Utils/CacheUtil" "dsDataex/Utils/CommonUtil" "dsDataex/Utils/ES7Util" "dsDataex/Utils/KafkaUtil" "encoding/json" "fmt" "strconv" "strings" "time" ) /** * @Author zhangjun * @Description 判断接入系统是否可以访问数据源 * @Date 2020-06-16 09:22 * @Param systemID string 接入系统id * @Param datasource string 数据类型code * @return bool 成功/失败 * @return string 结果 * @return *models 数据源实体指针 **/ func CheckDatasourceSet(systemID string,datasource string)(bool,string,*models.TDataexDatasource){ success,result,data,_:=DataexDAO.CheckDatasourceSet(systemID,datasource) if success{ business := new(models.TDataexDatasource) business.SystemId=systemID business.DatasourceCode=datasource business.Id=data["id"].(string) business.SetFlag,_=strconv.Atoi(data["set_flag"].(string)) business.CollectFlag,_=strconv.Atoi(data["collect_flag"].(string)) business.ProvideType,_=strconv.Atoi(data["provide_type"].(string)) business.ProvideOrgid=data["provide_orgid"].(string) business.DatastoreType,_=strconv.Atoi(data["datastore_type"].(string)) return true,"数据源类型验证成功",business }else { return false,result,nil } } func CheckDatasourceSql(systemID string,queryID string)(bool,string){ success,_,_,_:=DataexDAO.CheckDatasourceSql(systemID,queryID) if success{ return true,"接入系统ES-SQL查询权限验证成功" }else { return false,"接入系统ES-SQL查询权限验证失败" } } func CheckDatasourceCollect(systemID string,datasource string)(bool,string){ return true,"" } func CheckDatasourceGet(systemID string,datasource string)(bool,string,*models.TDataexDataaccess){ success,result,data,_:=DataexDAO.CheckDatasourceGet(systemID,datasource) if success{ business := new(models.TDataexDataaccess) business.ConsumeSystemid=systemID business.DatasourceCode=datasource business.Id=data["id"].(string) business.SetFlag,_=strconv.Atoi(data["set_flag"].(string)) business.QueryFlag,_=strconv.Atoi(data["query_flag"].(string)) business.ConsumeOrgid=data["consume_orgid"].(string) business.ConsumeType,_=strconv.Atoi(data["consume_type"].(string)) return true,result,business }else { return false,result,nil } } /** * @Author zhangjun * @Description 判断ES索引是否存在,不存在则创建一个索引 * @Date 2020-06-16 11:50 * @Param * @return **/ func CheckCreateIndex(esData *DataEX.ESData) (bool) { flag,_:= ES7Util.IndexExist(esData.DatasourceId) //创建 ES 索引 if flag==false { ES7Util.IndexInit(esData.DatasourceId, esData.DataContent) } return true } /** * @Author zhangjun * @Description 增加一个ES文档,测试逻辑删除、单条发送,不使用 * @Date 2020-06-16 16:03 * @Param * @return **/ func SetDocument(esData *DataEX.ESData) (bool,string) { ES7Util.IndexDocDel(esData.DatasourceId,esData.DataId) success,result,_:=ES7Util.IndexDocAdd(esData.DatasourceId,esData) return success,result } /** * @Author zhangjun * @Description Set ES索引文档,单条发送,速度较慢,不使用 * @Date 2020-06-17 09:37 * @Param systemId string 接入系统id * @Param datas 【】data 交换数据数组 * @Param datasource model 数据实体 * @return **/ func DataexSet(systemID string, datas []MySwagger.Data,datasource *models.TDataexDatasource)(bool,string,[]string,[]MySwagger.FailResult) { //开始 ES 相关数据操作!!! if datasource.DatastoreType==1 || datasource.DatastoreType==2 { var esData DataEX.ESData esData.SystemId=systemID esData.DatasourceId=datasource.DatasourceCode esData.OrgId=datas[0].OrgID esData.DelFlag=datas[0].DelFlag esData.EnableFlag=1 esData.DataId=datas[0].DataID var jsonData map[string]interface{} json.Unmarshal([]byte(datas[0].Data), &jsonData) esData.DataContent=jsonData //判断ES索引是否存在,没有则创建ES索引 CheckCreateIndex(&esData) //循环删除原索引文档 for no:=0;no< len(datas) && no<100 ;no++ { esData.DatasourceId=datasource.DatasourceCode esData.DataId=datas[no].DataID ES7Util.IndexDocDel(esData.DatasourceId,esData.DataId) } var successIDs []string var failResults []MySwagger.FailResult //循环添加新的索引文档 for no:=0;no< len(datas) && no<100 ;no++{ var esData2 DataEX.ESData esData2.SystemId=systemID esData2.DatasourceId=datasource.DatasourceCode esData2.EnableFlag=1 esData2.OrgId=datas[no].OrgID esData2.DelFlag=datas[no].DelFlag esData2.DataId=datas[no].DataID var jsonData map[string]interface{} json.Unmarshal([]byte(datas[no].Data), &jsonData) esData2.DataContent=jsonData //success,result:= SetDocument(&esData) success,result,_:=ES7Util.IndexDocAdd(esData2.DatasourceId,&esData2) if success{ successIDs=append(successIDs,esData2.DataId) }else { var failResult MySwagger.FailResult failResult.FailID=esData2.DataId failResult.FailReason=result failResults=append(failResults,failResult) } } return true,"",successIDs,failResults } //TODO:后续完善,暂不支持 //开始 Kafka 相关数据操作!!! if datasource.DatastoreType==3 { return false,"数据源类型配置不正确",nil,nil } return false,"数据源类型配置不正确",nil,nil } func DataexCollect(systemID string,users []MySwagger.User,events []MySwagger.Event,datasource *models.TDataexDatasource,ip string) (bool,string) { defer func(){ if err:=recover();err!=nil{ fmt.Println("DataexCollect Panic Recover :",err) } }() var now = time.Now().Format("2006/01/02 15:04:05") var userInfo = make( map[string]MySwagger.User) var accessInfo = make( map[string]MySwagger.User) if len(users)>0{ for no:=0;no< len(users);no++{ if users[no].UserID==""{ _,flag:= accessInfo[users[no].AccessID] if flag==false{ accessInfo[users[no].AccessID]=users[no] } }else { _,flag:= userInfo[users[no].UserID] if flag==false{ userInfo[users[no].UserID]=users[no] } } } } var datas []DataEX.KafkaData for no:=0;no< len(events);no++{ var data DataEX.KafkaData if events[no].EventUserID==""{ if len(users)>0{ data.UserID=users[0].UserID data.AccessID=users[0].AccessID data.Identity=users[0].Identity data.AccessWay=users[0].AccessWay data.AccessIP=users[0].AccessIP var jsonData map[string]interface{} json.Unmarshal([]byte(users[0].Properties), &jsonData) data.UserProperty =jsonData } }else { _,flag:= userInfo[events[no].EventUserID] if flag==true{ data.UserID=userInfo[events[no].EventUserID].UserID data.AccessID=userInfo[events[no].EventUserID].AccessID data.Identity=userInfo[events[no].EventUserID].Identity data.AccessWay=userInfo[events[no].EventUserID].AccessWay data.AccessIP=userInfo[events[no].EventUserID].AccessIP var jsonData map[string]interface{} json.Unmarshal([]byte(userInfo[events[no].EventUserID].Properties), &jsonData) data.UserProperty =jsonData } _,flag2:= accessInfo[events[no].EventUserID] if flag2==true{ data.UserID=accessInfo[events[no].EventUserID].UserID data.AccessID=accessInfo[events[no].EventUserID].AccessID data.Identity=accessInfo[events[no].EventUserID].Identity data.AccessWay=accessInfo[events[no].EventUserID].AccessWay data.AccessIP=accessInfo[events[no].EventUserID].AccessIP var jsonData map[string]interface{} json.Unmarshal([]byte(accessInfo[events[no].EventUserID].Properties), &jsonData) data.UserProperty =jsonData } } data.EventType=events[no].EventType data.EventName=events[no].EventName data.EventTime=events[no].EventTime data.EventURI=events[no].EventURI data.EventSeqNO=events[no].EventSeqNO var jsonData map[string]interface{} json.Unmarshal([]byte(events[no].Properties), &jsonData) data.EventProperty=jsonData data.SystemId=systemID data.DatasourceId=datasource.DatasourceCode data.CollectTime=now //add by zhangjun 2020-08-02 data.DataId=CommonUtil.GetUUID() if data.AccessIP==""{ data.AccessIP=ip } datas=append(datas,data) } //flag,result:=KafkaUtil.Provide(datasource.DatasourceCode,datas) flag,result:=KafkaUtil.ProvideLow(datasource.DatasourceCode,datas) if flag==true{ return true,"数据汇集成功" }else { return false,result } } /** * @Author zhangjun * @Description Set索引文档,批量发送 * @Date 2020-06-17 09:37 * @Param systemId string 接入系统id * @Param datas 【】data 交换数据数组 * @Param datasource model 数据实体 * @return bool 成功/失败 * @return string 结果 * @return 【】string 成功id数组 * @return 【】FailResult 失败结果数组 **/ func DataexSetBatch(systemID string, datas []MySwagger.Data,datasource *models.TDataexDatasource)(bool,string,[]string,[]MySwagger.FailResult) { //add by zhangjun 2020-07-15 //处理组织机构树同步 if datasource.DatasourceCode=="org_school" && systemID=="BASE_GO"{ OrgtreeProcBatch(datas) } if datasource.DatasourceCode=="org_school_lua" && systemID=="BASE_LUA"{ LuaOrgtreeProcBatch(datas) } if datasource.DatasourceCode=="org_school_java" && systemID=="BASE_JAVA"{ JavaOrgtreeProcBatch(datas) } //开始 ES 相关数据操作!!! if datasource.DatastoreType==1 || datasource.DatastoreType==2 { var esData DataEX.ESData esData.SystemId=systemID esData.DatasourceId=datasource.DatasourceCode esData.OrgId=datas[0].OrgID esData.DelFlag=datas[0].DelFlag esData.EnableFlag=1 esData.DataId=datas[0].DataID var jsonData map[string]interface{} json.Unmarshal([]byte(datas[0].Data), &jsonData) esData.DataContent=jsonData //一、判断ES索引是否存在,没有则创建ES索引 CheckCreateIndex(&esData) //三、删除原索引文档 var dataIDs []string for no:=0;no< len(datas) && no<1000 ;no++ { //change by zhangjun 2020-07-15 dataIDs = append(dataIDs, datas[no].DataID) //dataIDs = append(dataIDs, strings.ToLower(datas[no].DataID)) } now :=time.Now() ES7Util.IndexDocDelBatch(esData.DatasourceId,now,dataIDs) //ES 索引批处理文档数组 var indexData []*DataEX.ESData //数据校验失败 dataID数组 var failIDs []string var dataContentFailIDs []map[string]string //add by wangshuai //_, datasourceId := ValidationUtil.GetDatasourceIdByCode(datasource.DatasourceCode) //四、循环添加索引文档 for no:=0;no< len(datas) && no<1000 ;no++{ var esData2 DataEX.ESData esData2.SystemId=systemID esData2.DatasourceId=datasource.DatasourceCode esData2.EnableFlag=1 esData2.BeginTime = DataEX.JsonDate(now) esData2.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location())) esData2.DelFlag=datas[no].DelFlag //change by zhangjun 2020-07-15 esData2.OrgId=datas[no].OrgID esData2.DataId=datas[no].DataID //esData2.OrgId=strings.ToLower(datas[no].OrgID) //esData2.DataId=strings.ToLower(datas[no].DataID) var jsonData map[string]interface{} json.Unmarshal([]byte(datas[no].Data), &jsonData) esData2.DataContent=jsonData //re, datasourceId := ValidationUtil.GetDatasourceIdByCode(esData2.DatasourceId) //var r = false //if re == true { // r, _ = ValidationUtil.ValidESDataContent(datasourceId, esData2.DataContent) //} //数据校验:1、机构校验 flag,_,data,_:= DataexDAO.CheckProvideOrgID(datasource.ProvideType,datasource.ProvideOrgid,datas[no].OrgID) //fmt.Println("flag::", flag) if flag==true { //result, failMessages := ValidationUtil.ValidESDataContent(datasourceId, esData2.DataContent) var result=true var failMessages="" if result == true { if data["province_id"] != nil { esData2.ProvinceId=data["province_id"].(string) esData2.ProvinceName=CacheUtil.GBT2260[data["province_id"].(string)] } if data["city_id"] != nil { esData2.CityId=data["city_id"].(string) esData2.CityName=CacheUtil.GBT2260[data["city_id"].(string)] } if data["area_id"] != nil { esData2.AreaId=data["area_id"].(string) esData2.AreaName=CacheUtil.GBT2260[data["area_id"].(string)] } //add by zhangjun 2020-09-07 if data["area_id2"] != nil { esData2.Area2Id=data["area_id2"].(string) esData2.Area2Name=CacheUtil.GBT2260[data["area_id2"].(string)] } if data["area_id3"] != nil { esData2.Area3Id=data["area_id3"].(string) esData2.Area3Name=CacheUtil.GBT2260[data["area_id3"].(string)] } //add by zhangjun 2020-06-29 if data["school_type"] !=nil{ esData2.SchoolType = data["school_type"].(string) esData2.SchoolTypeName = CacheUtil.XXBXLXM[data["school_type"].(string)] } esData2.OrgName=data["org_name"].(string) esData2.OrgType,_=strconv.Atoi(data["org_type"].(string)) indexData=append(indexData,&esData2) } else { fails := make(map[string]string) fails["fail_id"] = datas[no].DataID fails["fail_msg"] = failMessages dataContentFailIDs=append(dataContentFailIDs,fails) } } else { failIDs=append(failIDs,datas[no].DataID) } } if indexData==nil { failIDs=nil return false,"数据机构权限全部不正确或验证不通过",nil,nil } success, _, successIDs, failResults := ES7Util.IndexDocAddBatch(esData.DatasourceId, indexData) //ES7Util.IndexDocDelBatch2(esData.DatasourceId,now,successIDs) if len(failIDs)>0{ for no:=0;no< len(failIDs);no++{ failResults=append(failResults,MySwagger.FailResult{FailID: failIDs[no],FailReason: "数据机构ID校验不通过,无权操作此机构数据", }) } } if len(dataContentFailIDs)>0{ for no:=0;no< len(dataContentFailIDs);no++{ failResults=append(failResults,MySwagger.FailResult{FailID: dataContentFailIDs[no]["fail_id"],FailReason: dataContentFailIDs[no]["fail_msg"], }) } } //五、处理错误数据,恢复删除文档 if len(failResults) > 0 { var failIDs2 []string for no:=0;no< len(failResults);no++{ failIDs2=append(failIDs2,failResults[no].FailID) } ES7Util.IndexDocDelRestore(esData.DatasourceId,now,failIDs2) DataexDAO.SaveDataError(failResults, datas, systemID, datasource.DatasourceCode ) } if success{ return true,"文档添加成功",successIDs,failResults }else { return false,"文档添加失败",successIDs,failResults } } //TODO:后续完善,暂不支持 //开始 Kafka 相关数据操作!!! if datasource.DatastoreType==3 { return false,"数据源类型配置不正确",nil,nil } return false,"数据源类型配置不正确",nil,nil } func DataexGet(datasourceCode string,consumeType int,consumeOrgID string,orgID string,dataID string)(bool,string,[]MySwagger.Data){ flag,result,_:=DataexDAO.CheckConsumeOrgID(consumeType,consumeOrgID,orgID) if flag{ var lstData []MySwagger.Data var data MySwagger.Data flag2,result2,esdata:= ES7Util.IndexDocGet(datasourceCode,dataID) if flag2{ data.DataID=esdata.DataId data.OrgID=esdata.OrgId data.DelFlag=esdata.DelFlag bytes,_:=json.Marshal(esdata.DataContent) data.Data=string(bytes) lstData=append(lstData,data) flag,result,_=DataexDAO.CheckConsumeOrgID(consumeType,consumeOrgID,data.OrgID) if flag{ fmt.Println("lstData", lstData) return true,"获取数据成功",lstData }else { return false,result,nil } }else { return false,result2,nil } }else { return flag,result,nil } } func DataexPage(datasourceCode string,consumeType int,consumeOrgID string,orgID string,pageNO int,beginTime string)(bool,string,[]MySwagger.Data) { flag, result, _ := DataexDAO.CheckConsumeOrgID(consumeType, consumeOrgID, orgID) if flag{ var lstEsdata []DataEX.ESData //add by zhangjun 2020-08-02 beginTime =strings.Replace(beginTime,"-","/",-1) if orgID=="-1"{ flag,result,lstEsdata= ES7Util.IndexDocPage(datasourceCode,nil,pageNO,beginTime) }else { orgIDs:=DataexDAO.GetDatasourceOrgIDS(consumeType,orgID) flag,result,lstEsdata= ES7Util.IndexDocPage(datasourceCode,orgIDs,pageNO,beginTime) } if flag{ var lstData []MySwagger.Data var data MySwagger.Data for no:=0;no< len(lstEsdata);no++{ data.DataID=lstEsdata[no].DataId; data.OrgID=lstEsdata[no].OrgId data.DelFlag=lstEsdata[no].DelFlag bytes,_:=json.Marshal(lstEsdata[no].DataContent) data.Data=string(bytes) lstData=append(lstData,data) } return true,"获取数据成功",lstData }else { return false,result,nil } return flag,result,nil }else { return false,result,nil } } func DataexQuery(datasourceCode string,consumeType int,consumeOrgID string,orgID string,pageNO int,conditions []string)(bool,string,[]MySwagger.Data) { flag, result, _ := DataexDAO.CheckConsumeOrgID(consumeType, consumeOrgID, orgID) if flag{ var lstEsdata []DataEX.ESData if orgID=="-1"{ flag,result,lstEsdata= ES7Util.IndexDocQuery(datasourceCode,nil,pageNO,conditions) }else { orgIDs:=DataexDAO.GetDatasourceOrgIDS(consumeType,orgID) flag,result,lstEsdata= ES7Util.IndexDocQuery(datasourceCode,orgIDs,pageNO,conditions) } if flag{ var lstData []MySwagger.Data var data MySwagger.Data for no:=0;no< len(lstEsdata);no++{ data.DataID=lstEsdata[no].DataId; data.OrgID=lstEsdata[no].OrgId data.DelFlag=lstEsdata[no].DelFlag bytes,_:=json.Marshal(lstEsdata[no].DataContent) data.Data=string(bytes) lstData=append(lstData,data) } return true,"获取数据成功",lstData }else { return false,result,nil } return flag,result,nil }else { return false,result,nil } }