Merge branch 'master' of 10.10.14.250:huanghai/dsMin

master
zhangjun 5 years ago
commit bf090f0bc9

@ -32,10 +32,20 @@ type tableStruct struct {
DataSource string `json:"data_source"`
}
//有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序
var sqlDict = []tableStruct{
//有同步哪些表(增量),之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序
var IncrSqlDict = []tableStruct{
{TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"},
{TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"},
{TableName: "t_base_teacher", PrimaryKey: "teacher_id", DataSource: "user_teacher"},
{TableName: "t_base_student", PrimaryKey: "student_id", DataSource: "user_student"},
{TableName: "t_sys_loginperson_log", PrimaryKey: "id", DataSource: "log_login"},
}
// 全量数据上报
var FullSqlDict = []tableStruct{
{TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"},
{TableName: "t_sys_dict", PrimaryKey: "dict_id", DataSource: "sys_dic"},
{TableName: "t_gov_area", PrimaryKey: "area_code", DataSource: "org_area"},
}
// 数据上报的结构体
@ -67,72 +77,77 @@ func init() {
os.MkdirAll(progressFilePath, os.ModePerm)
}
}
/**
2020-07-17
*/
func InitOrg() {
//(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报
logName := progressFilePath + "t_base_organization.log"
//判断文件是不是存在
if !FileUtil.PathExists(logName) {
//上报组织机构,读取t_base_organization的SQL脚本
//SQL内容
sql := FileUtil.ReadFileContent("./Sql/t_base_organization.sql")
//组织机构是需要按org_type,area_code排序
orgSql := sql + " order by t1.org_type,t1.area_code"
list, _ := db.SQL(orgSql).Query().List()
*/
func InitFull() {
for i := range FullSqlDict {
//(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报
logName := progressFilePath + FullSqlDict[i].TableName + ".log"
//判断文件是不是存在
if !FileUtil.PathExists(logName) {
//SQL内容
sql := FileUtil.ReadFileContent("./Sql/" + FullSqlDict[i].TableName + ".sql")
var t tableStruct
for i := range sqlDict {
if sqlDict[i].TableName == "t_base_organization" {
t = sqlDict[i]
break
var list []map[string]interface{}
//如果是组织机构表,那么需要变更一下查询的排序条件
if FullSqlDict[i].TableName == "t_base_organization" {
//组织机构是需要按org_type,area_code排序
sql = sql + " order by t1.org_type,t1.area_code"
}
}
var count = 0
var isFinish = false
for {
if isFinish {
break
list, _ = db.SQL(sql).Query().List()
var count = 0
var isFinish = false
for {
if isFinish {
break
}
//利用切片分批次上报
if len(list) > limit {
success := PostToServer(FullSqlDict[i], list[0:limit]) //0-99不包含100
if !success {
continue
}
count = count + limit
list = list[limit:]
} else if len(list) > 0 {
success := PostToServer(FullSqlDict[i], list)
if !success {
continue
}
count = count + len(list)
isFinish = true
} else {
isFinish = true
}
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成" + FullSqlDict[i].TableName + "初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!")
}
//利用切片分批次上报
if len(list) > limit {
success := PostToServer(t, list[0:limit]) //0-99不包含100
if !success {
continue
//对于组织机构进行特殊处理
if FullSqlDict[i].TableName == "t_base_organization" {
//记录日志
maxSql := sql + " order by t1.last_updated_time desc,t1.id_int desc limit 1"
var l logStruct
l.IdInt = 0
l.StartUpdateTs = defaultStartTs
//取得最后一行的最大值
list, _ = db.SQL(maxSql).Query().List()
if len(list) > 0 {
l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string)
l.IdInt = list[len(list)-1]["id_int"].(int64)
}
count = count + limit
list = list[limit:]
} else if len(list) > 0 {
success := PostToServer(t, list)
if !success {
continue
jsonBytes, err := json.Marshal(l)
if err != nil {
fmt.Println(err.Error())
}
count = count + len(list)
isFinish = true
FileUtil.WriteContent(logName, string(jsonBytes))
} else {
isFinish = true
FileUtil.WriteContent(logName, "is finished!")
}
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成组织机构初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!")
}
//记录日志
maxSql:=sql+" order by t1.last_updated_time,t1.id_int desc limit 1"
var l logStruct
l.IdInt = 0
l.StartUpdateTs = defaultStartTs
//取得最后一行的最大值
list,_=db.SQL(maxSql).Query().List()
if len(list) > 0 {
l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string)
l.IdInt = list[len(list)-1]["id_int"].(int64)
}
jsonBytes, err := json.Marshal(l)
if err != nil {
fmt.Println(err.Error())
}
FileUtil.WriteContent(logName, string(jsonBytes))
}
}
@ -145,7 +160,8 @@ func DataExchange() {
//死循环上报中
for {
//(1)组织机构上报
InitOrg()
InitFull()
//(2)本轮上报的数量如果是0休息5秒后再继续上传
postCount := UploadData()
if postCount == 0 {
@ -163,7 +179,7 @@ func DataExchange() {
func UploadData() int {
var postCount = 0
//遍历所有的配置节,进行循环
for i := range sqlDict {
for i := range IncrSqlDict {
count := 0
//默认的开始时间
startUpdateTs := defaultStartTs
@ -172,7 +188,7 @@ func UploadData() int {
//查询结果集
var list []map[string]interface{}
//表名
tableName := sqlDict[i].TableName
tableName := IncrSqlDict[i].TableName
//日志文件位置
logName := progressFilePath + tableName + ".log"
//判断文件是不是存在
@ -194,7 +210,7 @@ func UploadData() int {
break
}
//上报到Http Api--->Body--->Post
success := PostToServer(sqlDict[i], list)
success := PostToServer(IncrSqlDict[i], list)
if !success {
continue
}
@ -229,12 +245,18 @@ func PostToServer(t tableStruct, list []map[string]interface{}) bool {
ps.DataSource = t.DataSource
ps.AuthToken = ConfigUtil.DataExchangeAuthToken
ps.SystemId = ConfigUtil.DataExchangeSystemId
var dsMap = make([]dataStruct, 0)
for k := range list {
var ds dataStruct
ds.Data, _ = CommonUtil.MapToJson(list[k])
ds.DataId = list[k][t.PrimaryKey].(string)
switch list[k][t.PrimaryKey].(type) {
case int64:
ds.DataId = CommonUtil.ConvertInt64ToString(list[k][t.PrimaryKey].(int64))
break
default:
ds.DataId = list[k][t.PrimaryKey].(string)
break
}
ds.DelFlag = list[k]["del_flag"].(int64)
ds.OrgId = list[k]["bureau_id"].(string)
dsMap = append(dsMap, ds)
@ -245,7 +267,7 @@ func PostToServer(t tableStruct, list []map[string]interface{}) bool {
msg := string(jsonBytes)
//提交到汇集中心
p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg)
if !p.Success{
if !p.Success {
fmt.Println(CommonUtil.GetCurrentTime() + " 同步上报到数据汇集中心失败将休息5秒后重试错误原因" + p.Message)
time.Sleep(5 * 1e9)
}
@ -270,7 +292,7 @@ func getRecord(tableName string, lastUpdatedTime string, idInt int64) (string, i
var rsGt = make([]map[string]interface{}, 0)
lastUpdatedTime, idInt, rsEq = getRecordEq(eqSql, lastUpdatedTime, idInt, limit)
if len(rsEq) < limit {
if rsEq == nil || len(rsEq) < limit {
//尝试一次gt操作(组合一下)
lastUpdatedTime, idInt, rsGt = getRecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq))
if len(rsGt) > 0 {

@ -493,3 +493,17 @@ func DisableBureauManager(ids []string) error {
return nil
}
/**
2020-07-17
*/
func IsBuMen(orgId string) bool{
//判断是单位还是部门
list := SqlKit.QueryByIds([]string{orgId}, "t_base_organization")
if list[0]["org_type"].(int64)==3{
return true
}
return false
}

@ -45,13 +45,15 @@ func (s *Rpc) GetBaseOrganization(ctx context.Context, in *BaseOrganizationProto
func (s *Rpc) AddBaseOrganization(ctx context.Context, in *BaseOrganizationProto.ModelArg) (*BaseOrganizationProto.Reply, error) {
//rpc响应
var reply BaseOrganizationProto.Reply
//是不是部门
isBuMen := false
//2:学校7:教辅单位8:大学区,不在278中应该就是指部门
if in.OrgType == 2 || in.OrgType == 7 || in.OrgType == 8 {
//1:教育局,2:学校7:教辅单位8:大学区,不在1,278中应该就是指部门
if in.OrgType == 1 || in.OrgType == 2 || in.OrgType == 7 || in.OrgType == 8 {
//1、检查非空
if len(in.OrgCode) == 0 {
reply.Success = false
reply.Message = "编码代码为空,不能创建!"
reply.Message = "代码为空,不能创建!"
return &reply, nil
}
//2、检查AreaCode是否存在不存在需要提示
@ -67,6 +69,8 @@ func (s *Rpc) AddBaseOrganization(ctx context.Context, in *BaseOrganizationProto
reply.Message = "区域代码不存在,不能创建!"
return &reply, nil
}
} else {
isBuMen = true
}
//3、指定的org_code是否存在
@ -81,7 +85,7 @@ func (s *Rpc) AddBaseOrganization(ctx context.Context, in *BaseOrganizationProto
}
if exist {
reply.Success = false
reply.Message = "码已存在,无法创建!"
reply.Message = "码已存在,无法创建!"
return &reply, err
}
@ -95,7 +99,11 @@ func (s *Rpc) AddBaseOrganization(ctx context.Context, in *BaseOrganizationProto
}
if exist {
reply.Success = false
reply.Message = "单位/部门名称已存在,不能创建!"
if isBuMen {
reply.Message = "部门名称已存在,不能创建!"
} else {
reply.Message = "单位名称已存在,不能创建!"
}
return &reply, err
}
//5、调用dao
@ -230,7 +238,7 @@ func (s *Rpc) DeleteBaseOrganization(ctx context.Context, in *BaseOrganizationPr
//rpc响应
var reply BaseOrganizationProto.Reply
//遍历每一个单位/部门进行检查
//遍历每一个单位+部门进行检查
for i := range in.Ids {
//判断部门下是不是有可用的人员
var inTeacher BaseTeacherProto.QueryArg
@ -247,7 +255,12 @@ func (s *Rpc) DeleteBaseOrganization(ctx context.Context, in *BaseOrganizationPr
}
if count > 0 {
reply.Success = false
reply.Message = "单位/部门下存在教工数据,无法删除!"
//判断是单位还是部门
if BaseOrganizationDao.IsBuMen(inTeacher.OrgId){
reply.Message = "部门下存在教工数据,无法删除!"
}else{
reply.Message = "单位下存在教工数据,无法删除!"
}
return &reply, err
}
//判断部门下是不是有可用的子部门
@ -260,7 +273,12 @@ func (s *Rpc) DeleteBaseOrganization(ctx context.Context, in *BaseOrganizationPr
}
if count > 0 {
reply.Success = false
reply.Message = "单位/部门下存在子部门,无法删除!"
//判断是单位还是部门
if BaseOrganizationDao.IsBuMen(inTeacher.OrgId){
reply.Message = "部门下存在子部门,无法删除!"
}else{
reply.Message = "单位下存在子部门,无法删除!"
}
return &reply, err
}
//判断是不是有可用的班级
@ -277,7 +295,12 @@ func (s *Rpc) DeleteBaseOrganization(ctx context.Context, in *BaseOrganizationPr
}
if count > 0 {
reply.Success = false
reply.Message = "单位/部门下存在班级数据,无法删除!"
//判断是单位还是部门
if BaseOrganizationDao.IsBuMen(inTeacher.OrgId){
reply.Message = "部门下存在班级,无法删除!"
}else{
reply.Message = "单位下存在班级,无法删除!"
}
return &reply, err
}
}
@ -333,13 +356,13 @@ func (s *Rpc) UpdateBaseOrganization(ctx context.Context, in *BaseOrganizationPr
var reply BaseOrganizationProto.Reply
//1、指定的org_code是否存在
list := SqlKit.QueryByIds([]string{in.OrgId}, "t_base_organization")
if list==nil || len(list)==0{
if list == nil || len(list) == 0 {
reply.Success = false
reply.Message = "输入的OrgId无法找到对应的单位ID"
return &reply, nil
}
//根据OrgId换取回bureau_id 黄海修改于2020-07-16
bureauId:=list[0]["bureau_id"].(string)
bureauId := list[0]["bureau_id"].(string)
exist, err := BaseOrganizationDao.IsExistOrgCode(in.OrgCode, bureauId, in.OrgType, in.OrgId)
if err != nil {
reply.Success = false
@ -348,10 +371,9 @@ func (s *Rpc) UpdateBaseOrganization(ctx context.Context, in *BaseOrganizationPr
return &reply, err
}
if exist {
reply.Success = false
reply.Message = "码已存在,无法修改!"
reply.Message = "码已存在,无法修改!"
return &reply, err
}
//2、检查机构名称是否存在,否则在导入人员时,不知道向哪个单位下进行导入
@ -364,7 +386,11 @@ func (s *Rpc) UpdateBaseOrganization(ctx context.Context, in *BaseOrganizationPr
}
if exist {
reply.Success = false
reply.Message = "单位/部门名称已存在,无法修改!"
if BaseOrganizationDao.IsBuMen(in.OrgId){
reply.Message = "部门名称已存在,无法修改!"
}else{
reply.Message = "单位名称已存在,无法修改!"
}
return &reply, err
}
//3、修改

@ -103,7 +103,7 @@ func PageBaseTeacher(in *BaseTeacherProto.QueryArg) ([]map[string]interface{}, i
//所在单位ID
list := SqlKit.QueryByIds([]string{in.OrgId}, "t_base_organization")
if list == nil {
return nil, 0, errors.New("没有找到此单位码!")
return nil, 0, errors.New("没有找到此单位码!")
}
bureauId := list[0]["bureau_id"].(string)
if in.OrgId == bureauId {
@ -163,10 +163,33 @@ func FillLoginInfo(list *[]map[string]interface{}) error {
//对原始数据进行扩展
for i := range list2 {
record := list2[i]
record["login_name"] = _map[record["person_id"].(string)]["login_name"].(string)
record["pwd"] = _map[record["person_id"].(string)]["pwd"].(string)
record["original_pwd"] = _map[record["person_id"].(string)]["original_pwd"].(string)
record["login_b_use"] = _map[record["person_id"].(string)]["b_use"].(int64)
_, ok := _map[record["person_id"].(string)]["login_name"]
if ok {
record["login_name"] = _map[record["person_id"].(string)]["login_name"].(string)
} else {
record["login_name"] = "未找到"
}
_, ok = _map[record["person_id"].(string)]["pwd"]
if ok {
record["pwd"] = _map[record["person_id"].(string)]["pwd"].(string)
} else {
record["pwd"] = "未找到"
}
_, ok = _map[record["person_id"].(string)]["original_pwd"]
if ok {
record["original_pwd"] = _map[record["person_id"].(string)]["original_pwd"].(string)
} else {
record["original_pwd"] = "未找到"
}
_, ok = _map[record["person_id"].(string)]["b_use"]
if ok {
record["login_b_use"] = _map[record["person_id"].(string)]["b_use"].(int64)
} else {
record["login_b_use"] = 0
}
}
return nil
}

@ -0,0 +1,39 @@
SELECT
cast( t1.`person_id` AS CHAR ( 36 ) charset utf8 ) AS `student_id`,
t1.`id_int` AS `id_int`,
t1.`xm` AS `xm`,
t1.`xmpy` AS `xmpy`,
t1.`cym` AS `cym`,
t1.`xbm` AS `xbm`,
DATE_FORMAT(t1.`csrq`,'%Y/%m/%d %H:%i:%s') AS `csrq`,
t1.`mzm` AS `mzm`,
t1.`zzmmm` AS `zzmmm`,
t1.`sfzjlxm` AS `sfzjlxm`,
t1.`sfzjh` AS `sfzjh`,
t1.`dszybz` AS `dszybz`,
t1.`sqznbz` AS `sqznbz`,
t1.`jcwgrysqznbz` AS `jcwgrysqznbz`,
t1.`gebz` AS `gebz`,
t1.`lsetbz` AS `lsetbz`,
t1.`cjbz` AS `cjbz`,
cast( t1.`class_id` AS CHAR ( 36 ) charset utf8 ) AS `class_id`,
cast( t1.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`,
t1.`b_use` AS `b_use`,
t1.`state_id` AS `state_id`,
t1.`province_code` AS `province_code`,
t1.`city_code` AS `city_code`,
t1.`district_code` AS `district_code`,
cast( t1.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`,
DATE_FORMAT(t1.`last_updated_time`,'%Y/%m/%d %H:%i:%s') AS `last_updated_time`,
`t_base_organization`.`main_school_type` AS `main_school_type`,
`t_base_organization`.`xxbbm` AS `xxbbm`,
`t_base_organization`.`xxbxlxm` AS `xxbxlxm`,
`t_base_organization`.`szdcxlxm` AS `szdcxlxm`,
`t_base_class`.`stage_id` AS `stage_id`,
cast( `t_base_class`.`rxnf` AS CHAR ( 4 ) charset utf8 ) AS `rxnf` ,
case t1.`b_use` when -1 then 1 else 0 end AS `del_flag`
FROM
((
`t_base_student` as t1
JOIN `t_base_organization` ON ( t1.`bureau_id` = `t_base_organization`.`org_id` ))
JOIN `t_base_class` ON ( t1.`class_id` = `t_base_class`.`class_id` ))

@ -0,0 +1,38 @@
select cast(t1.`person_id` as char(36) charset utf8) AS `teacher_id`,
t1.`identity_id` AS `identity_id`,
t1.`id_int` AS `id_int`,
t1.`xm` AS `xm`,
t1.`xmpy` AS `xmpy`,
t1.`cym` AS `cym`,
t1.`xbm` AS `xbm`,
t1.`csrq` AS `csrq`,
t1.`mzm` AS `mzm`,
t1.`zzmmm` AS `zzmmm`,
t1.`sfzjlxm` AS `sfzjlxm`,
t1.`sfzjh` AS `sfzjh`,
t1.`xlm` AS `xlm`,
t1.`xwm` AS `xwm`,
t1.`zcm` AS `zcm`,
t1.`bzlbm` AS `bzlbm`,
t1.`stage_id` AS `stage_id`,
t1.`subject_id` AS `subject_id`,
t1.`gwzym` AS `gwzym`,
t1.`lxdh` AS `lxdh`,
t1.`dzxx` AS `dzxx`,
t1.`b_use` AS `b_use`,
t1.`state_id` AS `state_id`,
cast(t1.`bureau_id` as char(36) charset utf8) AS `bureau_id`,
cast(t1.`org_id` as char(36) charset utf8) AS `org_id`,
cast(t1.`main_school_id` as char(36) charset utf8) AS `main_school_id`,
t1.`province_code` AS `province_code`,
t1.`city_code` AS `city_code`,
t1.`district_code` AS `district_code`,
t1.`sort_id` AS `sort_id`,
DATE_FORMAT(t1.`last_updated_time`,'%Y/%m/%d %H:%i:%s') AS `last_updated_time`,
DATE_FORMAT(t1.`cjny`,'%Y/%m/%d %H:%i:%s') AS `cjny`,
case t1.`b_use` when -1 then 1 else 0 end AS `del_flag`,
`t_base_organization`.`main_school_type` AS `main_school_type`,
`t_base_organization`.`xxbbm` AS `xxbbm`,
`t_base_organization`.`xxbxlxm` AS `xxbxlxm`,
`t_base_organization`.`szdcxlxm` AS `szdcxlxm`
from (`t_base_teacher` as t1 join `t_base_organization` on (t1.`org_id` = `t_base_organization`.`org_id`))

@ -0,0 +1,2 @@
select t1.area_code,t1.area_name,t1.master_code,t1.area_level_id,t1.area_type_id,'-1' as bureau_id,
0 as del_flag from t_gov_area as t1

@ -0,0 +1,13 @@
SELECT
t1.`dict_id` AS `dict_id`,
t1.`dict_kind` AS `dict_kind`,
t1.`dict_code` AS `dict_code`,
t1.`dict_value` AS `dict_value`,
t1.`dict_remark` AS `dict_remark`,
t1.`dict_parent` AS `dict_parent`,
t1.`sort_id` AS `sort_id`,
cast( t1.`b_use` AS signed ) AS `b_use`,
'-1' as bureau_id,
case t1.`b_use` when -1 then 1 else 0 end AS `del_flag`
FROM
`t_sys_dict` as t1

@ -0,0 +1,10 @@
select t1.id,identity_id,t1.person_id,t1.login_name,t1.ip_address,
DATE_FORMAT(t1.`login_time`,'%Y/%m/%d %H:%i:%s') AS `login_time`,
t1.province_code,
t1.city_code,t1.district_code,t1.bureau_id,
DATE_FORMAT(t1.`last_updated_time`,'%Y/%m/%d %H:%i:%s') AS `last_updated_time`,
t1.login_state,
0 as del_flag
from t_sys_loginperson_log as t1

@ -1,76 +0,0 @@
// GENERATED BY THE COMMAND ABOVE; DO NOT EDIT
// This file was generated by swaggo/swag at
// 2020-05-20 21:35:10.8716006 +0800 CST m=+0.063828701
package docs
import (
"bytes"
"encoding/json"
"strings"
"github.com/alecthomas/template"
"github.com/swaggo/swag"
)
var doc = `{
"schemes": {{ marshal .Schemes }},
"swagger": "2.0",
"info": {
"description": "{{.Description}}",
"title": "{{.Title}}",
"contact": {},
"license": {},
"version": "{{.Version}}"
},
"host": "{{.Host}}",
"basePath": "{{.BasePath}}",
"paths": {}
}`
type swaggerInfo struct {
Version string
Host string
BasePath string
Schemes []string
Title string
Description string
}
// SwaggerInfo holds exported Swagger Info so clients can modify it
var SwaggerInfo = swaggerInfo{
Version: "",
Host: "",
BasePath: "",
Schemes: []string{},
Title: "",
Description: "",
}
type s struct{}
func (s *s) ReadDoc() string {
sInfo := SwaggerInfo
sInfo.Description = strings.Replace(sInfo.Description, "\n", "\\n", -1)
t, err := template.New("swagger_info").Funcs(template.FuncMap{
"marshal": func(v interface{}) string {
a, _ := json.Marshal(v)
return string(a)
},
}).Parse(doc)
if err != nil {
return doc
}
var tpl bytes.Buffer
if err := t.Execute(&tpl, sInfo); err != nil {
return doc
}
return tpl.String()
}
func init() {
swag.Register(swag.Name, &s{})
}

@ -1,8 +0,0 @@
{
"swagger": "2.0",
"info": {
"contact": {},
"license": {}
},
"paths": {}
}

@ -1,5 +0,0 @@
info:
contact: {}
license: {}
paths: {}
swagger: "2.0"
Loading…
Cancel
Save