diff --git a/dsBaseRpc/DataEx/DataEx.go b/dsBaseRpc/DataEx/DataEx.go index daf8e51e..afd15bf5 100644 --- a/dsBaseRpc/DataEx/DataEx.go +++ b/dsBaseRpc/DataEx/DataEx.go @@ -1,15 +1,20 @@ package DataEx import ( - "dsBaseRpc/Const" + "dsBaseRpc/Utils/CommonUtil" "dsBaseRpc/Utils/DbUtil" - "dsBaseRpc/Utils/LogUtil" + "dsBaseRpc/Utils/FileUtil" + "encoding/json" "fmt" + "time" ) var db = DbUtil.Engine -//有同步哪些表 +// 日志文件路径 +var progressFilePath = "/usr/local/SyncDataLogs/" + +//有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 var sqlDict = []string{"t_base_organization", "t_base_class"} /** @@ -19,20 +24,134 @@ var sqlDict = []string{"t_base_organization", "t_base_class"} */ func DataExchange() { for { - //本轮上报的数量,如果是0,休息10秒后再继续上传 - count:=0 - //遍历所有的配置节,进行循环 - for i := range sqlDict { - //判断日志目录下的记录是不是存在? - - //存在则读取last_updated_time+id_int进行分页获取 - paramMap := map[string]interface{}{"last_updated_time": "2020-07-15 00:00:00"} - list, err := db.SqlMapClient(sqlDict[i], ¶mMap).Query().List() - if err != nil { - LogUtil.Error(Const.DataBaseActionError, "数据上报时发生了严重错误:"+err.Error()) + //本轮上报的数量,如果是0,休息5秒后再继续上传 + postCount := UploadData() + if postCount == 0 { + fmt.Println("同步:本轮没有可以上报的数据,将休息5秒!") + time.Sleep(5 * 1e9) + } + } +} + +// 日志文件对应的结构体 +type logStruct struct { + startUpdateTs string + idInt int +} + +/** + * 功能:上报所有大于给定时间戳的数据 + * 作者:黄海 + * 时间:2019-01-02 + */ +func UploadData() int { + var postCount = 0 + //遍历所有的配置节,进行循环 + for i := range sqlDict { + count := 0 + //默认的开始时间 + startUpdateTs := "1970-01-01 00:00:00" + //默认的整数主键值 + idInt := 0 + //查询结果集 + var list []map[string]interface{} + //表名 + tableName := sqlDict[i] + //日志文件位置 + logName := progressFilePath + tableName + ".log" + //判断文件是不是存在 + if FileUtil.PathExists(logName) { + //读取配置信息 + var logstruct logStruct + if err := json.Unmarshal([]byte(FileUtil.ReadFileContent(logName)), &logstruct); err == nil { + startUpdateTs = logstruct.startUpdateTs + idInt = logstruct.idInt + } else { + fmt.Println("解析JSON文件失败:" + err.Error()) + } + } + //一直循环 + for { + //获取数据 + startUpdateTs, idInt, list = getRecord(tableName, startUpdateTs, idInt) + if len(list) == 0 { break } - fmt.Println(list) + //上报到Http Api + //TODO + + //记录上报数量 + count = count + len(list) + //记录日志 + var l logStruct + l.idInt = idInt + l.startUpdateTs = startUpdateTs + jsonBytes, _ := json.Marshal(l) + FileUtil.WriteContent(logName, string(jsonBytes)) + //提示信息 + fmt.Println("同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + + "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertIntToString(idInt)) + } + postCount = postCount + count + } + return postCount +} + +/** + * 功能:获取一个表的指定大小的数据 + * 作者:黄海 + * 时间:2019-01-16 + */ +func getRecord(tableName string, lastUpdatedTime string, idInt int) (string, int, []map[string]interface{}) { + //每次获取的条数 + limit := 100 + //SQL内容 + sqlPublic := FileUtil.ReadFileContent("./Sql/" + tableName + ".sql") + //时间戳相等的SQL + eqSql := sqlPublic + " where t1.last_updated_time =? and t1.id_int>? order by t1.last_updated_time,t1.id_int limit ?" + //时间戳大于的SQL + gtSql := sqlPublic + " where t1.last_updated_time >? order by t1.last_updated_time,t1.id_int limit ?" + //(1)计算相等的 + var rsEq = make([]map[string]interface{}, 0) + var rsGt = make([]map[string]interface{}, 0) + lastUpdatedTime, idInt, rsEq = getrecordEq(eqSql, lastUpdatedTime, idInt, limit) + + if len(rsEq) < limit { + //尝试一次gt操作(组合一下) + lastUpdatedTime, idInt, rsGt = getrecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq)) + if len(rsGt) > 0 { + //拼装一下,合并两个结果集 + for i := range rsGt { + rsEq = append(rsEq, rsGt[i]) + } + //更改一下 + lastUpdatedTime = rsGt[len(rsGt)-1]["last_updated_time"].(string) + idInt = rsGt[len(rsGt)-1]["id_int"].(int) } } + return lastUpdatedTime, idInt, rsEq +} + +func getrecordEq(eqSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) { + rs, _ := db.SQL(eqSql, lastUpdatedTime, idInt, limit).Query().List() + if rs != nil && len(rs) > 0 { + //取出最后一行的数据进行回写 + lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) + idInt = rs[len(rs)-1]["id_int"].(int) + return lastUpdatedTime, idInt, rs + } else { + return lastUpdatedTime, idInt, nil + } +} + +func getrecordGt(gtSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) { + rs, _ := db.SQL(gtSql, lastUpdatedTime, limit).Query().List() + if rs != nil && len(rs) > 0 { + //取出最后一行的数据进行回写 + lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) + idInt := rs[len(rs)-1]["id_int"].(int) + return lastUpdatedTime, idInt, rs + } else { + return lastUpdatedTime, idInt, nil + } } diff --git a/dsBaseRpc/Sql/ds_base.xml b/dsBaseRpc/Sql/ds_base.xml deleted file mode 100644 index 6016d265..00000000 --- a/dsBaseRpc/Sql/ds_base.xml +++ /dev/null @@ -1,79 +0,0 @@ - - - SELECT - cast( `t_base_organization`.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`, - `t_base_organization`.`id_int` AS `id_int`, - `t_base_organization`.`org_code` AS `org_code`, - `t_base_organization`.`org_name` AS `org_name`, - cast( `t_base_organization`.`parent_id` AS CHAR ( 36 ) charset utf8 ) AS `parent_id`, - cast( `t_base_organization`.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`, - `t_base_organization`.`org_type` AS `org_type`, - `t_base_organization`.`edu_assist_type` AS `edu_assist_type`, - `t_base_organization`.`main_school_type` AS `main_school_type`, - cast( `t_base_organization`.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`, - cast( `t_base_organization`.`manage_org_id` AS CHAR ( 36 ) charset utf8 ) AS `manage_org_id`, - `t_base_organization`.`directly_under_type` AS `directly_under_type`, - `t_base_organization`.`xxbbm` AS `xxbbm`, - `t_base_organization`.`xxbxlxm` AS `xxbxlxm`, - `t_base_organization`.`szdcxlxm` AS `szdcxlxm`, - `t_base_organization`.`xxjbzm` AS `xxjbzm`, - cast( `t_base_organization`.`fzr` AS CHAR ( 36 ) charset utf8 ) AS `fzr`, - `t_base_organization`.`fddbr` AS `fddbr`, - `t_base_organization`.`fddbrdh` AS `fddbrdh`, - `t_base_organization`.`address` AS `address`, - `t_base_organization`.`lxdh` AS `lxdh`, - `t_base_organization`.`org_lng` AS `org_lng`, - `t_base_organization`.`org_lat` AS `org_lat`, - `t_base_organization`.`sort_id` AS `sort_id`, - cast( `t_base_organization`.`b_use` AS signed ) AS `b_use`, - `t_base_organization`.`province_code` AS `province_code`, - `t_base_organization`.`city_code` AS `city_code`, - `t_base_organization`.`district_code` AS `district_code`, - `t_base_organization`.`area_code` AS `area_code`, - `t_base_organization`.`last_updated_time` AS `last_updated_time`, - DATE_FORMAT(`t_base_organization`.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, - case `t_base_organization`.`b_use` when -1 then 1 else 0 end AS `del_flag` - FROM - `t_base_organization` - WHERE - `t_base_organization`.`last_updated_time`>?last_updated_time - ORDER BY - `t_base_organization`.`last_updated_time` - - - - SELECT - cast( `t_base_class`.`class_id` AS CHAR ( 36 ) charset utf8 ) AS `class_id`, - `t_base_class`.`id_int` AS `id_int`, - `t_base_class`.`bh` AS `bh`, - `t_base_class`.`class_code` AS `class_code`, - `t_base_class`.`class_name` AS `class_name`, - `t_base_class`.`class_alias` AS `class_alias`, - `t_base_class`.`rxnf` AS `rxnf`, - `t_base_class`.`rxjj` AS `rxjj`, - `t_base_class`.`schooling_length` AS `schooling_length`, - `t_base_class`.`stage_id` AS `stage_id`, - cast( `t_base_class`.`teacher_id` AS CHAR ( 36 ) charset utf8 ) AS `teacher_id`, - cast( `t_base_class`.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`, - cast( `t_base_class`.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`, - cast( `t_base_class`.`b_use` AS signed ) AS `b_use`, - `t_base_class`.`province_code` AS `province_code`, - `t_base_class`.`city_code` AS `city_code`, - `t_base_class`.`district_code` AS `district_code`, - cast( `t_base_class`.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`, - DATE_FORMAT(`t_base_class`.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, - `t_base_class`.`last_updated_time` AS `last_updated_time`, - `t_dm_stage`.`stage_name` AS `stage_name`, - `t_base_organization`.`org_name` AS `org_name` , - case `t_base_class`.`b_use` when -1 then 1 else 0 end AS `del_flag` - FROM - (( - `t_base_class` - JOIN `t_dm_stage` ON ( `t_base_class`.`stage_id` = `t_dm_stage`.`stage_id` )) - JOIN `t_base_organization` ON ( `t_base_class`.`bureau_id` = `t_base_organization`.`org_id` )) - WHERE - `t_base_class`.`last_updated_time`>?last_updated_time - ORDER BY - `t_base_class`.`last_updated_time` - - diff --git a/dsBaseRpc/Sql/t_base_class.sql b/dsBaseRpc/Sql/t_base_class.sql new file mode 100644 index 00000000..c5261d14 --- /dev/null +++ b/dsBaseRpc/Sql/t_base_class.sql @@ -0,0 +1,28 @@ +SELECT + cast( t1.`class_id` AS CHAR ( 36 ) charset utf8 ) AS `class_id`, + t1.`id_int` AS `id_int`, + t1.`bh` AS `bh`, + t1.`class_code` AS `class_code`, + t1.`class_name` AS `class_name`, + t1.`class_alias` AS `class_alias`, + t1.`rxnf` AS `rxnf`, + t1.`rxjj` AS `rxjj`, + t1.`schooling_length` AS `schooling_length`, + t1.`stage_id` AS `stage_id`, + cast( t1.`teacher_id` AS CHAR ( 36 ) charset utf8 ) AS `teacher_id`, + cast( t1.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`, + cast( t1.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`, + cast( t1.`b_use` AS signed ) AS `b_use`, + t1.`province_code` AS `province_code`, + t1.`city_code` AS `city_code`, + t1.`district_code` AS `district_code`, + cast( t1.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`, + DATE_FORMAT(t1.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, + t1.`last_updated_time` AS `last_updated_time`, + `t_dm_stage`.`stage_name` AS `stage_name`, + `t_base_organization`.`org_name` AS `org_name` , + case t1.`b_use` when -1 then 1 else 0 end AS `del_flag` + FROM + ((t_base_class as t1 JOIN `t_dm_stage` ON ( t1.`stage_id` = `t_dm_stage`.`stage_id` )) + JOIN `t_base_organization` ON ( t1.`bureau_id` = `t_base_organization`.`org_id` )) + \ No newline at end of file diff --git a/dsBaseRpc/Sql/t_base_organization.sql b/dsBaseRpc/Sql/t_base_organization.sql new file mode 100644 index 00000000..3533c896 --- /dev/null +++ b/dsBaseRpc/Sql/t_base_organization.sql @@ -0,0 +1,36 @@ +SELECT + cast( t1.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`, + t1.`id_int` AS `id_int`, + t1.`org_code` AS `org_code`, + t1.`org_name` AS `org_name`, + cast( t1.`parent_id` AS CHAR ( 36 ) charset utf8 ) AS `parent_id`, + cast( t1.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`, + t1.`org_type` AS `org_type`, + t1.`edu_assist_type` AS `edu_assist_type`, + t1.`main_school_type` AS `main_school_type`, + cast( t1.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`, + cast( t1.`manage_org_id` AS CHAR ( 36 ) charset utf8 ) AS `manage_org_id`, + t1.`directly_under_type` AS `directly_under_type`, + t1.`xxbbm` AS `xxbbm`, + t1.`xxbxlxm` AS `xxbxlxm`, + t1.`szdcxlxm` AS `szdcxlxm`, + t1.`xxjbzm` AS `xxjbzm`, + cast( t1.`fzr` AS CHAR ( 36 ) charset utf8 ) AS `fzr`, + t1.`fddbr` AS `fddbr`, + t1.`fddbrdh` AS `fddbrdh`, + t1.`address` AS `address`, + t1.`lxdh` AS `lxdh`, + t1.`org_lng` AS `org_lng`, + t1.`org_lat` AS `org_lat`, + t1.`sort_id` AS `sort_id`, + cast( t1.`b_use` AS signed ) AS `b_use`, + t1.`province_code` AS `province_code`, + t1.`city_code` AS `city_code`, + t1.`district_code` AS `district_code`, + t1.`area_code` AS `area_code`, + t1.`last_updated_time` AS `last_updated_time`, + DATE_FORMAT(t1.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, + case t1.`b_use` when -1 then 1 else 0 end AS `del_flag` + FROM + `t_base_organization` as t1 +