master
huanghai 5 years ago
parent c17dda8d67
commit bb18d1dd1b

@ -1,15 +1,20 @@
package DataEx
import (
"dsBaseRpc/Const"
"dsBaseRpc/Utils/CommonUtil"
"dsBaseRpc/Utils/DbUtil"
"dsBaseRpc/Utils/LogUtil"
"dsBaseRpc/Utils/FileUtil"
"encoding/json"
"fmt"
"time"
)
var db = DbUtil.Engine
//有同步哪些表
// 日志文件路径
var progressFilePath = "/usr/local/SyncDataLogs/"
//有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序
var sqlDict = []string{"t_base_organization", "t_base_class"}
/**
@ -19,20 +24,134 @@ var sqlDict = []string{"t_base_organization", "t_base_class"}
*/
func DataExchange() {
for {
//本轮上报的数量如果是0休息10秒后再继续上传
count:=0
//遍历所有的配置节,进行循环
for i := range sqlDict {
//判断日志目录下的记录是不是存在?
//存在则读取last_updated_time+id_int进行分页获取
paramMap := map[string]interface{}{"last_updated_time": "2020-07-15 00:00:00"}
list, err := db.SqlMapClient(sqlDict[i], &paramMap).Query().List()
if err != nil {
LogUtil.Error(Const.DataBaseActionError, "数据上报时发生了严重错误:"+err.Error())
//本轮上报的数量如果是0休息5秒后再继续上传
postCount := UploadData()
if postCount == 0 {
fmt.Println("同步本轮没有可以上报的数据将休息5秒")
time.Sleep(5 * 1e9)
}
}
}
// 日志文件对应的结构体
type logStruct struct {
startUpdateTs string
idInt int
}
/**
*
*
* 2019-01-02
*/
func UploadData() int {
var postCount = 0
//遍历所有的配置节,进行循环
for i := range sqlDict {
count := 0
//默认的开始时间
startUpdateTs := "1970-01-01 00:00:00"
//默认的整数主键值
idInt := 0
//查询结果集
var list []map[string]interface{}
//表名
tableName := sqlDict[i]
//日志文件位置
logName := progressFilePath + tableName + ".log"
//判断文件是不是存在
if FileUtil.PathExists(logName) {
//读取配置信息
var logstruct logStruct
if err := json.Unmarshal([]byte(FileUtil.ReadFileContent(logName)), &logstruct); err == nil {
startUpdateTs = logstruct.startUpdateTs
idInt = logstruct.idInt
} else {
fmt.Println("解析JSON文件失败:" + err.Error())
}
}
//一直循环
for {
//获取数据
startUpdateTs, idInt, list = getRecord(tableName, startUpdateTs, idInt)
if len(list) == 0 {
break
}
fmt.Println(list)
//上报到Http Api
//TODO
//记录上报数量
count = count + len(list)
//记录日志
var l logStruct
l.idInt = idInt
l.startUpdateTs = startUpdateTs
jsonBytes, _ := json.Marshal(l)
FileUtil.WriteContent(logName, string(jsonBytes))
//提示信息
fmt.Println("同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) +
"个start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertIntToString(idInt))
}
postCount = postCount + count
}
return postCount
}
/**
*
*
* 2019-01-16
*/
func getRecord(tableName string, lastUpdatedTime string, idInt int) (string, int, []map[string]interface{}) {
//每次获取的条数
limit := 100
//SQL内容
sqlPublic := FileUtil.ReadFileContent("./Sql/" + tableName + ".sql")
//时间戳相等的SQL
eqSql := sqlPublic + " where t1.last_updated_time =? and t1.id_int>? order by t1.last_updated_time,t1.id_int limit ?"
//时间戳大于的SQL
gtSql := sqlPublic + " where t1.last_updated_time >? order by t1.last_updated_time,t1.id_int limit ?"
//(1)计算相等的
var rsEq = make([]map[string]interface{}, 0)
var rsGt = make([]map[string]interface{}, 0)
lastUpdatedTime, idInt, rsEq = getrecordEq(eqSql, lastUpdatedTime, idInt, limit)
if len(rsEq) < limit {
//尝试一次gt操作(组合一下)
lastUpdatedTime, idInt, rsGt = getrecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq))
if len(rsGt) > 0 {
//拼装一下,合并两个结果集
for i := range rsGt {
rsEq = append(rsEq, rsGt[i])
}
//更改一下
lastUpdatedTime = rsGt[len(rsGt)-1]["last_updated_time"].(string)
idInt = rsGt[len(rsGt)-1]["id_int"].(int)
}
}
return lastUpdatedTime, idInt, rsEq
}
func getrecordEq(eqSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) {
rs, _ := db.SQL(eqSql, lastUpdatedTime, idInt, limit).Query().List()
if rs != nil && len(rs) > 0 {
//取出最后一行的数据进行回写
lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string)
idInt = rs[len(rs)-1]["id_int"].(int)
return lastUpdatedTime, idInt, rs
} else {
return lastUpdatedTime, idInt, nil
}
}
func getrecordGt(gtSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) {
rs, _ := db.SQL(gtSql, lastUpdatedTime, limit).Query().List()
if rs != nil && len(rs) > 0 {
//取出最后一行的数据进行回写
lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string)
idInt := rs[len(rs)-1]["id_int"].(int)
return lastUpdatedTime, idInt, rs
} else {
return lastUpdatedTime, idInt, nil
}
}

@ -1,79 +0,0 @@
<sqlMap>
<sql id="t_base_organization">
SELECT
cast( `t_base_organization`.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`,
`t_base_organization`.`id_int` AS `id_int`,
`t_base_organization`.`org_code` AS `org_code`,
`t_base_organization`.`org_name` AS `org_name`,
cast( `t_base_organization`.`parent_id` AS CHAR ( 36 ) charset utf8 ) AS `parent_id`,
cast( `t_base_organization`.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`,
`t_base_organization`.`org_type` AS `org_type`,
`t_base_organization`.`edu_assist_type` AS `edu_assist_type`,
`t_base_organization`.`main_school_type` AS `main_school_type`,
cast( `t_base_organization`.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`,
cast( `t_base_organization`.`manage_org_id` AS CHAR ( 36 ) charset utf8 ) AS `manage_org_id`,
`t_base_organization`.`directly_under_type` AS `directly_under_type`,
`t_base_organization`.`xxbbm` AS `xxbbm`,
`t_base_organization`.`xxbxlxm` AS `xxbxlxm`,
`t_base_organization`.`szdcxlxm` AS `szdcxlxm`,
`t_base_organization`.`xxjbzm` AS `xxjbzm`,
cast( `t_base_organization`.`fzr` AS CHAR ( 36 ) charset utf8 ) AS `fzr`,
`t_base_organization`.`fddbr` AS `fddbr`,
`t_base_organization`.`fddbrdh` AS `fddbrdh`,
`t_base_organization`.`address` AS `address`,
`t_base_organization`.`lxdh` AS `lxdh`,
`t_base_organization`.`org_lng` AS `org_lng`,
`t_base_organization`.`org_lat` AS `org_lat`,
`t_base_organization`.`sort_id` AS `sort_id`,
cast( `t_base_organization`.`b_use` AS signed ) AS `b_use`,
`t_base_organization`.`province_code` AS `province_code`,
`t_base_organization`.`city_code` AS `city_code`,
`t_base_organization`.`district_code` AS `district_code`,
`t_base_organization`.`area_code` AS `area_code`,
`t_base_organization`.`last_updated_time` AS `last_updated_time`,
DATE_FORMAT(`t_base_organization`.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`,
case `t_base_organization`.`b_use` when -1 then 1 else 0 end AS `del_flag`
FROM
`t_base_organization`
WHERE
`t_base_organization`.`last_updated_time`>?last_updated_time
ORDER BY
`t_base_organization`.`last_updated_time`
</sql>
<sql id="t_base_class">
SELECT
cast( `t_base_class`.`class_id` AS CHAR ( 36 ) charset utf8 ) AS `class_id`,
`t_base_class`.`id_int` AS `id_int`,
`t_base_class`.`bh` AS `bh`,
`t_base_class`.`class_code` AS `class_code`,
`t_base_class`.`class_name` AS `class_name`,
`t_base_class`.`class_alias` AS `class_alias`,
`t_base_class`.`rxnf` AS `rxnf`,
`t_base_class`.`rxjj` AS `rxjj`,
`t_base_class`.`schooling_length` AS `schooling_length`,
`t_base_class`.`stage_id` AS `stage_id`,
cast( `t_base_class`.`teacher_id` AS CHAR ( 36 ) charset utf8 ) AS `teacher_id`,
cast( `t_base_class`.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`,
cast( `t_base_class`.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`,
cast( `t_base_class`.`b_use` AS signed ) AS `b_use`,
`t_base_class`.`province_code` AS `province_code`,
`t_base_class`.`city_code` AS `city_code`,
`t_base_class`.`district_code` AS `district_code`,
cast( `t_base_class`.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`,
DATE_FORMAT(`t_base_class`.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`,
`t_base_class`.`last_updated_time` AS `last_updated_time`,
`t_dm_stage`.`stage_name` AS `stage_name`,
`t_base_organization`.`org_name` AS `org_name` ,
case `t_base_class`.`b_use` when -1 then 1 else 0 end AS `del_flag`
FROM
((
`t_base_class`
JOIN `t_dm_stage` ON ( `t_base_class`.`stage_id` = `t_dm_stage`.`stage_id` ))
JOIN `t_base_organization` ON ( `t_base_class`.`bureau_id` = `t_base_organization`.`org_id` ))
WHERE
`t_base_class`.`last_updated_time`>?last_updated_time
ORDER BY
`t_base_class`.`last_updated_time`
</sql>
</sqlMap>

@ -0,0 +1,28 @@
SELECT
cast( t1.`class_id` AS CHAR ( 36 ) charset utf8 ) AS `class_id`,
t1.`id_int` AS `id_int`,
t1.`bh` AS `bh`,
t1.`class_code` AS `class_code`,
t1.`class_name` AS `class_name`,
t1.`class_alias` AS `class_alias`,
t1.`rxnf` AS `rxnf`,
t1.`rxjj` AS `rxjj`,
t1.`schooling_length` AS `schooling_length`,
t1.`stage_id` AS `stage_id`,
cast( t1.`teacher_id` AS CHAR ( 36 ) charset utf8 ) AS `teacher_id`,
cast( t1.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`,
cast( t1.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`,
cast( t1.`b_use` AS signed ) AS `b_use`,
t1.`province_code` AS `province_code`,
t1.`city_code` AS `city_code`,
t1.`district_code` AS `district_code`,
cast( t1.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`,
DATE_FORMAT(t1.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`,
t1.`last_updated_time` AS `last_updated_time`,
`t_dm_stage`.`stage_name` AS `stage_name`,
`t_base_organization`.`org_name` AS `org_name` ,
case t1.`b_use` when -1 then 1 else 0 end AS `del_flag`
FROM
((t_base_class as t1 JOIN `t_dm_stage` ON ( t1.`stage_id` = `t_dm_stage`.`stage_id` ))
JOIN `t_base_organization` ON ( t1.`bureau_id` = `t_base_organization`.`org_id` ))

@ -0,0 +1,36 @@
SELECT
cast( t1.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`,
t1.`id_int` AS `id_int`,
t1.`org_code` AS `org_code`,
t1.`org_name` AS `org_name`,
cast( t1.`parent_id` AS CHAR ( 36 ) charset utf8 ) AS `parent_id`,
cast( t1.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`,
t1.`org_type` AS `org_type`,
t1.`edu_assist_type` AS `edu_assist_type`,
t1.`main_school_type` AS `main_school_type`,
cast( t1.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`,
cast( t1.`manage_org_id` AS CHAR ( 36 ) charset utf8 ) AS `manage_org_id`,
t1.`directly_under_type` AS `directly_under_type`,
t1.`xxbbm` AS `xxbbm`,
t1.`xxbxlxm` AS `xxbxlxm`,
t1.`szdcxlxm` AS `szdcxlxm`,
t1.`xxjbzm` AS `xxjbzm`,
cast( t1.`fzr` AS CHAR ( 36 ) charset utf8 ) AS `fzr`,
t1.`fddbr` AS `fddbr`,
t1.`fddbrdh` AS `fddbrdh`,
t1.`address` AS `address`,
t1.`lxdh` AS `lxdh`,
t1.`org_lng` AS `org_lng`,
t1.`org_lat` AS `org_lat`,
t1.`sort_id` AS `sort_id`,
cast( t1.`b_use` AS signed ) AS `b_use`,
t1.`province_code` AS `province_code`,
t1.`city_code` AS `city_code`,
t1.`district_code` AS `district_code`,
t1.`area_code` AS `area_code`,
t1.`last_updated_time` AS `last_updated_time`,
DATE_FORMAT(t1.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`,
case t1.`b_use` when -1 then 1 else 0 end AS `del_flag`
FROM
`t_base_organization` as t1
Loading…
Cancel
Save