master
huanghai 5 years ago
parent ef77b2f4eb
commit 64789038a6

@ -6,6 +6,7 @@ import (
"dsBaseRpc/Utils/FileUtil"
"encoding/json"
"fmt"
"os"
"time"
)
@ -17,6 +18,15 @@ var progressFilePath = "/usr/local/SyncDataLogs/"
//有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序
var sqlDict = []string{"t_base_organization", "t_base_class"}
/**
*/
func init() {
if !FileUtil.PathExists(progressFilePath) {
os.MkdirAll(progressFilePath, os.ModePerm)
}
}
/**
@ -35,8 +45,8 @@ func DataExchange() {
// 日志文件对应的结构体
type logStruct struct {
startUpdateTs string
idInt int
StartUpdateTs string `json:"start_update_ts"`
IdInt int64 `json:"id_int"`
}
/**
@ -52,7 +62,7 @@ func UploadData() int {
//默认的开始时间
startUpdateTs := "1970-01-01 00:00:00"
//默认的整数主键值
idInt := 0
var idInt int64 = 0
//查询结果集
var list []map[string]interface{}
//表名
@ -64,8 +74,8 @@ func UploadData() int {
//读取配置信息
var logstruct logStruct
if err := json.Unmarshal([]byte(FileUtil.ReadFileContent(logName)), &logstruct); err == nil {
startUpdateTs = logstruct.startUpdateTs
idInt = logstruct.idInt
startUpdateTs = logstruct.StartUpdateTs
idInt = logstruct.IdInt
} else {
fmt.Println("解析JSON文件失败:" + err.Error())
}
@ -84,13 +94,16 @@ func UploadData() int {
count = count + len(list)
//记录日志
var l logStruct
l.idInt = idInt
l.startUpdateTs = startUpdateTs
jsonBytes, _ := json.Marshal(l)
l.IdInt = idInt
l.StartUpdateTs = startUpdateTs
jsonBytes, err := json.Marshal(l)
if err != nil {
fmt.Println(err.Error())
}
FileUtil.WriteContent(logName, string(jsonBytes))
//提示信息
fmt.Println("同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) +
"个start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertIntToString(idInt))
"个start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt))
}
postCount = postCount + count
}
@ -102,7 +115,7 @@ func UploadData() int {
*
* 2019-01-16
*/
func getRecord(tableName string, lastUpdatedTime string, idInt int) (string, int, []map[string]interface{}) {
func getRecord(tableName string, lastUpdatedTime string, idInt int64) (string, int64, []map[string]interface{}) {
//每次获取的条数
limit := 100
//SQL内容
@ -114,11 +127,11 @@ func getRecord(tableName string, lastUpdatedTime string, idInt int) (string, int
//(1)计算相等的
var rsEq = make([]map[string]interface{}, 0)
var rsGt = make([]map[string]interface{}, 0)
lastUpdatedTime, idInt, rsEq = getrecordEq(eqSql, lastUpdatedTime, idInt, limit)
lastUpdatedTime, idInt, rsEq = getRecordEq(eqSql, lastUpdatedTime, idInt, limit)
if len(rsEq) < limit {
//尝试一次gt操作(组合一下)
lastUpdatedTime, idInt, rsGt = getrecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq))
lastUpdatedTime, idInt, rsGt = getRecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq))
if len(rsGt) > 0 {
//拼装一下,合并两个结果集
for i := range rsGt {
@ -126,30 +139,40 @@ func getRecord(tableName string, lastUpdatedTime string, idInt int) (string, int
}
//更改一下
lastUpdatedTime = rsGt[len(rsGt)-1]["last_updated_time"].(string)
idInt = rsGt[len(rsGt)-1]["id_int"].(int)
idInt = rsGt[len(rsGt)-1]["id_int"].(int64)
}
}
return lastUpdatedTime, idInt, rsEq
}
func getrecordEq(eqSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) {
/**
2020-7-17
*/
func getRecordEq(eqSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) {
rs, _ := db.SQL(eqSql, lastUpdatedTime, idInt, limit).Query().List()
if rs != nil && len(rs) > 0 {
//取出最后一行的数据进行回写
lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string)
idInt = rs[len(rs)-1]["id_int"].(int)
idInt = rs[len(rs)-1]["id_int"].(int64)
return lastUpdatedTime, idInt, rs
} else {
return lastUpdatedTime, idInt, nil
}
}
func getrecordGt(gtSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) {
/**
2020-7-17
*/
func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) {
rs, _ := db.SQL(gtSql, lastUpdatedTime, limit).Query().List()
if rs != nil && len(rs) > 0 {
//取出最后一行的数据进行回写
lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string)
idInt := rs[len(rs)-1]["id_int"].(int)
idInt := rs[len(rs)-1]["id_int"].(int64)
return lastUpdatedTime, idInt, rs
} else {
return lastUpdatedTime, idInt, nil

Loading…
Cancel
Save