From 568e795fe1c9645c8e16858d7db7b463f7ff13b1 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 12:57:42 +0800 Subject: [PATCH 1/8] 'commit' --- dsBaseRpc/DataExchange/DataExchange.go | 113 +++++++++++++----- .../BaseClass/BaseClassDao/BaseClassDao.go | 37 +++++- .../BaseTeacherService/BaseTeacherService.go | 9 ++ 3 files changed, 131 insertions(+), 28 deletions(-) diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index 9612c6b5..a75963c6 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -16,6 +16,9 @@ import ( var db = DbUtil.Engine +//每次获取的条数 +var limit = 100 + // 日志文件路径 var progressFilePath = "/usr/local/SyncDataLogs/" @@ -68,6 +71,52 @@ func init() { 时间:2020-07-16 */ func DataExchange() { + //是不是进行过首次上报,如果没有话,需要执行一次组织机构上报 + //日志文件位置 + logName := progressFilePath + "orgInit.log" + //判断文件是不是存在 + if !FileUtil.PathExists(logName) { + //上报组织机构,读取t_base_organization的SQL脚本 + //SQL内容 + sql := FileUtil.ReadFileContent("./Sql/t_base_organization.sql") + //组织机构是需要按org_type,area_code排序 + orgSql := sql + " order by t1.org_type,t1.area_code" + list, _ := db.SQL(orgSql).Query().List() + + var t tableStruct + for i := range sqlDict { + if sqlDict[i].TableName == "t_base_organization" { + t = sqlDict[i] + break + } + } + var count = 0 + var isFinish = false + for { + if isFinish { + break + } + //利用切片分批次上报 + if len(list) > limit { + success:=PostToServer(t, list[0:limit]) + if !success{ + continue + } + count = count + limit + list = list[limit:] + } else if len(list) > 0 { + PostToServer(t, list) + count = count + len(list) + isFinish = true + } else { + isFinish = true + } + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成组织机构初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!") + } + //记录日志,内容为1,表示组织机构初始化上报过了~ + FileUtil.WriteContent(logName, "ok") + } + for { //本轮上报的数量,如果是0,休息5秒后再继续上传 postCount := UploadData() @@ -117,29 +166,8 @@ func UploadData() int { break } //上报到Http Api--->Body--->Post - var ps postStruct - ps.DataSource = sqlDict[i].DataSource - ps.AuthToken = "DSDataex_Token_eb4ab2fea87161dc08fa794a648584c4" - ps.SystemId = "BASE_GO" - - var dsMap = make([]dataStruct, 0) - for k := range list { - var ds dataStruct - ds.Data, _ = CommonUtil.MapToJson(list[k]) - ds.DataId = list[k][sqlDict[i].PrimaryKey].(string) - ds.DelFlag = list[k]["del_flag"].(int64) - ds.OrgId = list[k]["bureau_id"].(string) - dsMap = append(dsMap, ds) - } - ps.Datas = dsMap - //将Struct转为json - jsonBytes, _ := json.Marshal(ps) - msg := string(jsonBytes) - //提交到汇集中心 - p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg) - if !p.Success { - fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:"+p.Message) - time.Sleep(5 * 1e9) + success := PostToServer(sqlDict[i], list) + if !success { continue } //记录上报数量 @@ -162,16 +190,47 @@ func UploadData() int { return postCount } +/** +功能:将数据批量上报到汇集中心 +作者:黄海 +时间:2020-07-17 +*/ +func PostToServer(t tableStruct, list []map[string]interface{}) bool { + //上报到Http Api--->Body--->Post + var ps postStruct + ps.DataSource = t.DataSource + ps.AuthToken = "DSDataex_Token_eb4ab2fea87161dc08fa794a648584c4" + ps.SystemId = "BASE_GO" + + var dsMap = make([]dataStruct, 0) + for k := range list { + var ds dataStruct + ds.Data, _ = CommonUtil.MapToJson(list[k]) + ds.DataId = list[k][t.PrimaryKey].(string) + ds.DelFlag = list[k]["del_flag"].(int64) + ds.OrgId = list[k]["bureau_id"].(string) + dsMap = append(dsMap, ds) + } + ps.Datas = dsMap + //将Struct转为json + jsonBytes, _ := json.Marshal(ps) + msg := string(jsonBytes) + //提交到汇集中心 + p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg) + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:" + p.Message) + time.Sleep(5 * 1e9) + return p.Success +} + /** * 功能:获取一个表的指定大小的数据 * 作者:黄海 * 时间:2019-01-16 */ func getRecord(tableName string, lastUpdatedTime string, idInt int64) (string, int64, []map[string]interface{}) { - //每次获取的条数 - limit := 100 //SQL内容 sqlPublic := FileUtil.ReadFileContent("./Sql/" + tableName + ".sql") + //时间戳相等的SQL eqSql := sqlPublic + " where t1.last_updated_time =? and t1.id_int>? order by t1.last_updated_time,t1.id_int limit ?" //时间戳大于的SQL @@ -245,8 +304,8 @@ type ResultStruct struct { // 基础方法,这里多用于访问webapi,配合上json转换。此方法可以运行但是不算完善。 func httpDo(method string, url string, msg string) ResultStruct { var p ResultStruct - p.Success=false - p.Message="上报到汇集系统失败!" + p.Success = false + p.Message = "上报到汇集系统失败!" client := &http.Client{} body := bytes.NewBuffer([]byte(msg)) req, err := http.NewRequest(method, diff --git a/dsBaseRpc/RpcService/BaseClass/BaseClassDao/BaseClassDao.go b/dsBaseRpc/RpcService/BaseClass/BaseClassDao/BaseClassDao.go index c4ef7208..622d3bc7 100644 --- a/dsBaseRpc/RpcService/BaseClass/BaseClassDao/BaseClassDao.go +++ b/dsBaseRpc/RpcService/BaseClass/BaseClassDao/BaseClassDao.go @@ -88,7 +88,7 @@ func PageBaseClass(in *BaseClassProto.QueryArg) ([]map[string]interface{}, int32 myBuilder.And(builder.Eq{"t1.stage_id": in.StageId}) myBuilder.And(builder.Eq{"t1.bureau_id": in.BureauId}) //排除某个班级,用在将学生进行调整班级时的需求 - if in.RemoveClassId!=""{ + if in.RemoveClassId != "" { myBuilder.And(builder.Neq{"t1.class_id": in.RemoveClassId}) } //获取拼接完成的SQL语句 @@ -152,3 +152,38 @@ func GetRxnf(schoolId string, stageId string) ([]map[string]interface{}, int32, } return SqlKit.QueryForPk(sql) } + +/** +功能:清除删除人员时,人员对应的班主任信息 +作者:黄海 +时间:2020-07-17 +*/ +func ClearPersonBzr(teacherIds []string) error { + var myBuilder = builder.Dialect(builder.MYSQL).Select("class_id"). + From("t_base_class"). + Where(builder.In("teacher_id", teacherIds)). + And(builder.Eq{"b_use": 1}) + sql, err := myBuilder.ToBoundSQL() + if err != nil { + return err + } + list, _, err := SqlKit.QueryForPk(sql) + if err != nil { + return err + } + var ClassIds []string + for i := range list { + id := list[i]["class_id"].(string) + ClassIds = append(ClassIds, id) + //删除班主任 + tbc := new(models.TBaseClass) + _, err := db.Table(tbc).ID(id).Update(map[string]interface{}{"teacher_id": ""}) + if err != nil { + return err + } + } + //清除Redis缓存 + var selector = SqlKit.GetBean("t_base_class") + SqlKit.DeleteCacheByIds(ClassIds, selector) + return nil +} diff --git a/dsBaseRpc/RpcService/BaseTeacher/BaseTeacherService/BaseTeacherService.go b/dsBaseRpc/RpcService/BaseTeacher/BaseTeacherService/BaseTeacherService.go index 8dbd4b4e..4fce7713 100644 --- a/dsBaseRpc/RpcService/BaseTeacher/BaseTeacherService/BaseTeacherService.go +++ b/dsBaseRpc/RpcService/BaseTeacher/BaseTeacherService/BaseTeacherService.go @@ -4,6 +4,7 @@ import ( "context" "dsBaseRpc/Const" "dsBaseRpc/Const/ErrorConst" + "dsBaseRpc/RpcService/BaseClass/BaseClassDao" "dsBaseRpc/RpcService/BaseOrganization/BaseOrganizationDao" "dsBaseRpc/RpcService/BaseOrganizationManager/BaseOrganizationManagerDao" "dsBaseRpc/RpcService/BaseTeacher/BaseTeacherDao" @@ -189,6 +190,14 @@ func (s *Rpc) DeleteBaseTeacher(ctx context.Context, in *BaseTeacherProto.Delete reply.Message = Const.DataBaseActionError return &reply, err } + //删除人员时,对应的班主任,也要删除一下~ + err=BaseClassDao.ClearPersonBzr(in.Ids) + if err != nil { + LogUtil.Error(ErrorConst.SqlUpdateError, "执行ClearPersonBzr时发生严重错误:"+err.Error()) + reply.Success = false + reply.Message = Const.DataBaseActionError + return &reply, err + } //记录日志 ms, err := BaseTeacherDao.GetByIds(in.Ids) if err != nil { From c727fb1d1b34b6f489e4afb325e20c479f349d0b Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 13:02:23 +0800 Subject: [PATCH 2/8] 'commit' --- dsBaseRpc/DataExchange/DataExchange.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index a75963c6..9e0ce1a7 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -98,14 +98,17 @@ func DataExchange() { } //利用切片分批次上报 if len(list) > limit { - success:=PostToServer(t, list[0:limit]) + success:=PostToServer(t, list[0:limit]) //0-99不包含100 if !success{ continue } count = count + limit list = list[limit:] } else if len(list) > 0 { - PostToServer(t, list) + success:=PostToServer(t, list) + if !success{ + continue + } count = count + len(list) isFinish = true } else { From ebc32692d5c8426c16307cce83c7e1fc1a1834eb Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 13:11:52 +0800 Subject: [PATCH 3/8] 'commit' --- dsBaseRpc/DataExchange/DataExchange.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index 9e0ce1a7..5f9d295d 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -73,7 +73,7 @@ func init() { func DataExchange() { //是不是进行过首次上报,如果没有话,需要执行一次组织机构上报 //日志文件位置 - logName := progressFilePath + "orgInit.log" + logName := progressFilePath + "t_base_organization.log" //判断文件是不是存在 if !FileUtil.PathExists(logName) { //上报组织机构,读取t_base_organization的SQL脚本 @@ -82,7 +82,13 @@ func DataExchange() { //组织机构是需要按org_type,area_code排序 orgSql := sql + " order by t1.org_type,t1.area_code" list, _ := db.SQL(orgSql).Query().List() - + var l logStruct + l.IdInt = 0 + l.StartUpdateTs = "1970-01-01 00:00:00" + if len(list)>0{ + l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string) + l.IdInt = list[len(list)-1]["id_int"].(int64) + } var t tableStruct for i := range sqlDict { if sqlDict[i].TableName == "t_base_organization" { @@ -116,8 +122,12 @@ func DataExchange() { } fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成组织机构初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!") } - //记录日志,内容为1,表示组织机构初始化上报过了~ - FileUtil.WriteContent(logName, "ok") + //记录日志 + jsonBytes, err := json.Marshal(l) + if err != nil { + fmt.Println(err.Error()) + } + FileUtil.WriteContent(logName, string(jsonBytes)) } for { From bb881e4ed3fbf3ef4d589d7670a54117554e9fdb Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 13:31:59 +0800 Subject: [PATCH 4/8] 'commit' --- dsBaseRpc/DataExchange/DataExchange.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index 5f9d295d..42757763 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -18,6 +18,8 @@ var db = DbUtil.Engine //每次获取的条数 var limit = 100 +//默认开始时间 +var defaultStartTs="1970-01-01 00:00:00" // 日志文件路径 var progressFilePath = "/usr/local/SyncDataLogs/" @@ -84,7 +86,7 @@ func DataExchange() { list, _ := db.SQL(orgSql).Query().List() var l logStruct l.IdInt = 0 - l.StartUpdateTs = "1970-01-01 00:00:00" + l.StartUpdateTs = defaultStartTs if len(list)>0{ l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string) l.IdInt = list[len(list)-1]["id_int"].(int64) @@ -151,7 +153,7 @@ func UploadData() int { for i := range sqlDict { count := 0 //默认的开始时间 - startUpdateTs := "1970-01-01 00:00:00" + startUpdateTs := defaultStartTs //默认的整数主键值 var idInt int64 = 0 //查询结果集 @@ -303,15 +305,9 @@ func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) ( } } -type FailResultStruct struct { - FailId string `json:"fail_id"` - FailReason string `json:"fail_reason"` -} - type ResultStruct struct { Message string `json:"message"` Success bool `json:"success"` - FailResults []FailResultStruct `json:"fail_results"` } // 基础方法,这里多用于访问webapi,配合上json转换。此方法可以运行但是不算完善。 From 017d8b2a4dafe08cb9ac1eb0496e1c92fd41c2a4 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 13:34:34 +0800 Subject: [PATCH 5/8] 'commit' --- dsBaseRpc/DataExchange/DataExchange.go | 43 ++++++++++++++++---------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index 42757763..ea8aab00 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -18,8 +18,9 @@ var db = DbUtil.Engine //每次获取的条数 var limit = 100 + //默认开始时间 -var defaultStartTs="1970-01-01 00:00:00" +var defaultStartTs = "1970-01-01 00:00:00" // 日志文件路径 var progressFilePath = "/usr/local/SyncDataLogs/" @@ -66,15 +67,13 @@ func init() { os.MkdirAll(progressFilePath, os.ModePerm) } } - /** -功能:数据上报 +功能:组织机构上报 作者:黄海 -时间:2020-07-16 -*/ -func DataExchange() { - //是不是进行过首次上报,如果没有话,需要执行一次组织机构上报 - //日志文件位置 +时间:2020-07-17 + */ +func InitOrg() { + //(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报 logName := progressFilePath + "t_base_organization.log" //判断文件是不是存在 if !FileUtil.PathExists(logName) { @@ -86,8 +85,8 @@ func DataExchange() { list, _ := db.SQL(orgSql).Query().List() var l logStruct l.IdInt = 0 - l.StartUpdateTs = defaultStartTs - if len(list)>0{ + l.StartUpdateTs = defaultStartTs + if len(list) > 0 { l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string) l.IdInt = list[len(list)-1]["id_int"].(int64) } @@ -106,15 +105,15 @@ func DataExchange() { } //利用切片分批次上报 if len(list) > limit { - success:=PostToServer(t, list[0:limit]) //0-99不包含100 - if !success{ + success := PostToServer(t, list[0:limit]) //0-99不包含100 + if !success { continue } count = count + limit list = list[limit:] } else if len(list) > 0 { - success:=PostToServer(t, list) - if !success{ + success := PostToServer(t, list) + if !success { continue } count = count + len(list) @@ -131,9 +130,19 @@ func DataExchange() { } FileUtil.WriteContent(logName, string(jsonBytes)) } +} +/** +功能:数据上报 +作者:黄海 +时间:2020-07-16 +*/ +func DataExchange() { + //死循环上报中 for { - //本轮上报的数量,如果是0,休息5秒后再继续上传 + //(1)组织机构上报 + InitOrg() + //(2)本轮上报的数量,如果是0,休息5秒后再继续上传 postCount := UploadData() if postCount == 0 { fmt.Println(CommonUtil.GetCurrentTime() + " 同步:本轮没有可以上报的数据,将休息5秒!") @@ -306,8 +315,8 @@ func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) ( } type ResultStruct struct { - Message string `json:"message"` - Success bool `json:"success"` + Message string `json:"message"` + Success bool `json:"success"` } // 基础方法,这里多用于访问webapi,配合上json转换。此方法可以运行但是不算完善。 From 3c6e578c253bb0c12db0045047afaf10151a4936 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 13:38:14 +0800 Subject: [PATCH 6/8] 'commit' --- dsBaseRpc/Config/Config.ini | 4 +++- dsBaseRpc/DataExchange/DataExchange.go | 4 ++-- dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go | 6 +++++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/dsBaseRpc/Config/Config.ini b/dsBaseRpc/Config/Config.ini index 6db2b79d..f6bc271f 100644 --- a/dsBaseRpc/Config/Config.ini +++ b/dsBaseRpc/Config/Config.ini @@ -32,4 +32,6 @@ KafkaAddress = server.dsmin.com:9092 # 数据汇集的地址 [dataExchange] -url = http://10.10.14.239:9009/v1/dataex/DataexSet +url = http://10.10.14.239:9009/v1/dataex/DataexSet +AuthToken = DSDataex_Token_eb4ab2fea87161dc08fa794a648584c4 +SystemId = BASE_GO diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index ea8aab00..f1f542f0 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -223,8 +223,8 @@ func PostToServer(t tableStruct, list []map[string]interface{}) bool { //上报到Http Api--->Body--->Post var ps postStruct ps.DataSource = t.DataSource - ps.AuthToken = "DSDataex_Token_eb4ab2fea87161dc08fa794a648584c4" - ps.SystemId = "BASE_GO" + ps.AuthToken = ConfigUtil.DataExchangeAuthToken + ps.SystemId = ConfigUtil.DataExchangeSystemId var dsMap = make([]dataStruct, 0) for k := range list { diff --git a/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go b/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go index cb947408..dbc003d5 100644 --- a/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go +++ b/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go @@ -40,7 +40,9 @@ var ( KafkaAddress string //数据汇集中心地址 - DataExchangeUrl string + DataExchangeUrl string + DataExchangeAuthToken string + DataExchangeSystemId string ) func init() { @@ -98,6 +100,8 @@ func init() { //数据汇集中心地址 DataExchangeUrl=iniParser.GetString("dataExchange","url") + DataExchangeAuthToken =iniParser.GetString("dataExchange","AuthToken") + DataExchangeSystemId =iniParser.GetString("dataExchange","SystemId") } type IniParser struct { From 10569f52f7047e7c0b79b9c470c78dc17903bda6 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 13:40:35 +0800 Subject: [PATCH 7/8] 'commit' --- dsBaseRpc/DataExchange/DataExchange.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index f1f542f0..9b0a5e98 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -319,7 +319,7 @@ type ResultStruct struct { Success bool `json:"success"` } -// 基础方法,这里多用于访问webapi,配合上json转换。此方法可以运行但是不算完善。 +// 基础方法,这里多用于访问webapi,配合上json转换。 func httpDo(method string, url string, msg string) ResultStruct { var p ResultStruct p.Success = false From fc1987642765c4b56df0ab6677934462a9623b56 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 13:53:56 +0800 Subject: [PATCH 8/8] 'commit' --- dsBaseRpc/DataExchange/DataExchange.go | 26 ++++++++++++++++---------- dsBaseRpc/Sql/t_base_class.sql | 2 +- dsBaseRpc/Sql/t_base_organization.sql | 2 +- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index 9b0a5e98..3e7a66a7 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -17,7 +17,7 @@ import ( var db = DbUtil.Engine //每次获取的条数 -var limit = 100 +var limit = 200 //默认开始时间 var defaultStartTs = "1970-01-01 00:00:00" @@ -83,13 +83,7 @@ func InitOrg() { //组织机构是需要按org_type,area_code排序 orgSql := sql + " order by t1.org_type,t1.area_code" list, _ := db.SQL(orgSql).Query().List() - var l logStruct - l.IdInt = 0 - l.StartUpdateTs = defaultStartTs - if len(list) > 0 { - l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string) - l.IdInt = list[len(list)-1]["id_int"].(int64) - } + var t tableStruct for i := range sqlDict { if sqlDict[i].TableName == "t_base_organization" { @@ -124,6 +118,16 @@ func InitOrg() { fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成组织机构初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!") } //记录日志 + maxSql:=sql+" order by t1.last_updated_time,t1.id_int desc limit 1" + var l logStruct + l.IdInt = 0 + l.StartUpdateTs = defaultStartTs + //取得最后一行的最大值 + list,_=db.SQL(maxSql).Query().List() + if len(list) > 0 { + l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string) + l.IdInt = list[len(list)-1]["id_int"].(int64) + } jsonBytes, err := json.Marshal(l) if err != nil { fmt.Println(err.Error()) @@ -241,8 +245,10 @@ func PostToServer(t tableStruct, list []map[string]interface{}) bool { msg := string(jsonBytes) //提交到汇集中心 p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg) - fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:" + p.Message) - time.Sleep(5 * 1e9) + if !p.Success{ + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:" + p.Message) + time.Sleep(5 * 1e9) + } return p.Success } diff --git a/dsBaseRpc/Sql/t_base_class.sql b/dsBaseRpc/Sql/t_base_class.sql index c5261d14..786e3eee 100644 --- a/dsBaseRpc/Sql/t_base_class.sql +++ b/dsBaseRpc/Sql/t_base_class.sql @@ -18,7 +18,7 @@ SELECT t1.`district_code` AS `district_code`, cast( t1.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`, DATE_FORMAT(t1.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, - t1.`last_updated_time` AS `last_updated_time`, + DATE_FORMAT(t1.`last_updated_time`,'%Y/%m/%d %H:%i:%s') AS `last_updated_time`, `t_dm_stage`.`stage_name` AS `stage_name`, `t_base_organization`.`org_name` AS `org_name` , case t1.`b_use` when -1 then 1 else 0 end AS `del_flag` diff --git a/dsBaseRpc/Sql/t_base_organization.sql b/dsBaseRpc/Sql/t_base_organization.sql index 3533c896..80b28f1a 100644 --- a/dsBaseRpc/Sql/t_base_organization.sql +++ b/dsBaseRpc/Sql/t_base_organization.sql @@ -28,7 +28,7 @@ SELECT t1.`city_code` AS `city_code`, t1.`district_code` AS `district_code`, t1.`area_code` AS `area_code`, - t1.`last_updated_time` AS `last_updated_time`, + DATE_FORMAT(t1.`last_updated_time`,'%Y/%m/%d %H:%i:%s') AS `last_updated_time`, DATE_FORMAT(t1.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, case t1.`b_use` when -1 then 1 else 0 end AS `del_flag` FROM