package DataExchange import ( "dsBaseRpc/Utils/CommonUtil" "dsBaseRpc/Utils/DbUtil" "dsBaseRpc/Utils/FileUtil" "encoding/json" "fmt" "os" "time" ) var db = DbUtil.Engine // 日志文件路径 var progressFilePath = "/usr/local/SyncDataLogs/" //有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 var sqlDict = []string{"t_base_organization", "t_base_class"} /** 功能:初始化目录 */ func init() { if !FileUtil.PathExists(progressFilePath) { os.MkdirAll(progressFilePath, os.ModePerm) } } /** 功能:数据上报 作者:黄海 时间:2020-07-16 */ func DataExchange() { for { //本轮上报的数量,如果是0,休息5秒后再继续上传 postCount := UploadData() if postCount == 0 { fmt.Println("同步:本轮没有可以上报的数据,将休息5秒!") time.Sleep(5 * 1e9) } } } // 日志文件对应的结构体 type logStruct struct { StartUpdateTs string `json:"start_update_ts"` IdInt int64 `json:"id_int"` } /** * 功能:上报所有大于给定时间戳的数据 * 作者:黄海 * 时间:2019-01-02 */ func UploadData() int { var postCount = 0 //遍历所有的配置节,进行循环 for i := range sqlDict { count := 0 //默认的开始时间 startUpdateTs := "1970-01-01 00:00:00" //默认的整数主键值 var idInt int64 = 0 //查询结果集 var list []map[string]interface{} //表名 tableName := sqlDict[i] //日志文件位置 logName := progressFilePath + tableName + ".log" //判断文件是不是存在 if FileUtil.PathExists(logName) { //读取配置信息 var logstruct logStruct if err := json.Unmarshal([]byte(FileUtil.ReadFileContent(logName)), &logstruct); err == nil { startUpdateTs = logstruct.StartUpdateTs idInt = logstruct.IdInt } else { fmt.Println("解析JSON文件失败:" + err.Error()) } } //一直循环 for { //获取数据 startUpdateTs, idInt, list = getRecord(tableName, startUpdateTs, idInt) if len(list) == 0 { break } //上报到Http Api //TODO //记录上报数量 count = count + len(list) //记录日志 var l logStruct l.IdInt = idInt l.StartUpdateTs = startUpdateTs jsonBytes, err := json.Marshal(l) if err != nil { fmt.Println(err.Error()) } FileUtil.WriteContent(logName, string(jsonBytes)) //提示信息 fmt.Println("同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt)) } postCount = postCount + count } return postCount } /** * 功能:获取一个表的指定大小的数据 * 作者:黄海 * 时间:2019-01-16 */ func getRecord(tableName string, lastUpdatedTime string, idInt int64) (string, int64, []map[string]interface{}) { //每次获取的条数 limit := 100 //SQL内容 sqlPublic := FileUtil.ReadFileContent("./Sql/" + tableName + ".sql") //时间戳相等的SQL eqSql := sqlPublic + " where t1.last_updated_time =? and t1.id_int>? order by t1.last_updated_time,t1.id_int limit ?" //时间戳大于的SQL gtSql := sqlPublic + " where t1.last_updated_time >? order by t1.last_updated_time,t1.id_int limit ?" //(1)计算相等的 var rsEq = make([]map[string]interface{}, 0) var rsGt = make([]map[string]interface{}, 0) lastUpdatedTime, idInt, rsEq = getRecordEq(eqSql, lastUpdatedTime, idInt, limit) if len(rsEq) < limit { //尝试一次gt操作(组合一下) lastUpdatedTime, idInt, rsGt = getRecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq)) if len(rsGt) > 0 { //拼装一下,合并两个结果集 for i := range rsGt { rsEq = append(rsEq, rsGt[i]) } //更改一下 lastUpdatedTime = rsGt[len(rsGt)-1]["last_updated_time"].(string) idInt = rsGt[len(rsGt)-1]["id_int"].(int64) } } return lastUpdatedTime, idInt, rsEq } /** 功能:获取时间戳一致的数据 作者:黄海 时间:2020-7-17 */ func getRecordEq(eqSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) { rs, _ := db.SQL(eqSql, lastUpdatedTime, idInt, limit).Query().List() if rs != nil && len(rs) > 0 { //取出最后一行的数据进行回写 lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) idInt = rs[len(rs)-1]["id_int"].(int64) return lastUpdatedTime, idInt, rs } else { return lastUpdatedTime, idInt, nil } } /** 功能:获取大于指定时间戳的数据 作者:黄海 时间:2020-7-17 */ func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) { rs, _ := db.SQL(gtSql, lastUpdatedTime, limit).Query().List() if rs != nil && len(rs) > 0 { //取出最后一行的数据进行回写 lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) idInt := rs[len(rs)-1]["id_int"].(int64) return lastUpdatedTime, idInt, rs } else { return lastUpdatedTime, idInt, nil } }