|
|
|
@ -16,6 +16,12 @@ import (
|
|
|
|
|
|
|
|
|
|
var db = DbUtil.Engine
|
|
|
|
|
|
|
|
|
|
//每次获取的条数
|
|
|
|
|
var limit = 200
|
|
|
|
|
|
|
|
|
|
//默认开始时间
|
|
|
|
|
var defaultStartTs = "1970-01-01 00:00:00"
|
|
|
|
|
|
|
|
|
|
// 日志文件路径
|
|
|
|
|
var progressFilePath = "/usr/local/SyncDataLogs/"
|
|
|
|
|
|
|
|
|
@ -61,6 +67,74 @@ func init() {
|
|
|
|
|
os.MkdirAll(progressFilePath, os.ModePerm)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
功能:组织机构上报
|
|
|
|
|
作者:黄海
|
|
|
|
|
时间:2020-07-17
|
|
|
|
|
*/
|
|
|
|
|
func InitOrg() {
|
|
|
|
|
//(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报
|
|
|
|
|
logName := progressFilePath + "t_base_organization.log"
|
|
|
|
|
//判断文件是不是存在
|
|
|
|
|
if !FileUtil.PathExists(logName) {
|
|
|
|
|
//上报组织机构,读取t_base_organization的SQL脚本
|
|
|
|
|
//SQL内容
|
|
|
|
|
sql := FileUtil.ReadFileContent("./Sql/t_base_organization.sql")
|
|
|
|
|
//组织机构是需要按org_type,area_code排序
|
|
|
|
|
orgSql := sql + " order by t1.org_type,t1.area_code"
|
|
|
|
|
list, _ := db.SQL(orgSql).Query().List()
|
|
|
|
|
|
|
|
|
|
var t tableStruct
|
|
|
|
|
for i := range sqlDict {
|
|
|
|
|
if sqlDict[i].TableName == "t_base_organization" {
|
|
|
|
|
t = sqlDict[i]
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
var count = 0
|
|
|
|
|
var isFinish = false
|
|
|
|
|
for {
|
|
|
|
|
if isFinish {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
//利用切片分批次上报
|
|
|
|
|
if len(list) > limit {
|
|
|
|
|
success := PostToServer(t, list[0:limit]) //0-99不包含100
|
|
|
|
|
if !success {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
count = count + limit
|
|
|
|
|
list = list[limit:]
|
|
|
|
|
} else if len(list) > 0 {
|
|
|
|
|
success := PostToServer(t, list)
|
|
|
|
|
if !success {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
count = count + len(list)
|
|
|
|
|
isFinish = true
|
|
|
|
|
} else {
|
|
|
|
|
isFinish = true
|
|
|
|
|
}
|
|
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成组织机构初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!")
|
|
|
|
|
}
|
|
|
|
|
//记录日志
|
|
|
|
|
maxSql:=sql+" order by t1.last_updated_time,t1.id_int desc limit 1"
|
|
|
|
|
var l logStruct
|
|
|
|
|
l.IdInt = 0
|
|
|
|
|
l.StartUpdateTs = defaultStartTs
|
|
|
|
|
//取得最后一行的最大值
|
|
|
|
|
list,_=db.SQL(maxSql).Query().List()
|
|
|
|
|
if len(list) > 0 {
|
|
|
|
|
l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string)
|
|
|
|
|
l.IdInt = list[len(list)-1]["id_int"].(int64)
|
|
|
|
|
}
|
|
|
|
|
jsonBytes, err := json.Marshal(l)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println(err.Error())
|
|
|
|
|
}
|
|
|
|
|
FileUtil.WriteContent(logName, string(jsonBytes))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
功能:数据上报
|
|
|
|
@ -68,8 +142,11 @@ func init() {
|
|
|
|
|
时间:2020-07-16
|
|
|
|
|
*/
|
|
|
|
|
func DataExchange() {
|
|
|
|
|
//死循环上报中
|
|
|
|
|
for {
|
|
|
|
|
//本轮上报的数量,如果是0,休息5秒后再继续上传
|
|
|
|
|
//(1)组织机构上报
|
|
|
|
|
InitOrg()
|
|
|
|
|
//(2)本轮上报的数量,如果是0,休息5秒后再继续上传
|
|
|
|
|
postCount := UploadData()
|
|
|
|
|
if postCount == 0 {
|
|
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:本轮没有可以上报的数据,将休息5秒!")
|
|
|
|
@ -89,7 +166,7 @@ func UploadData() int {
|
|
|
|
|
for i := range sqlDict {
|
|
|
|
|
count := 0
|
|
|
|
|
//默认的开始时间
|
|
|
|
|
startUpdateTs := "1970-01-01 00:00:00"
|
|
|
|
|
startUpdateTs := defaultStartTs
|
|
|
|
|
//默认的整数主键值
|
|
|
|
|
var idInt int64 = 0
|
|
|
|
|
//查询结果集
|
|
|
|
@ -117,29 +194,8 @@ func UploadData() int {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
//上报到Http Api--->Body--->Post
|
|
|
|
|
var ps postStruct
|
|
|
|
|
ps.DataSource = sqlDict[i].DataSource
|
|
|
|
|
ps.AuthToken = "DSDataex_Token_eb4ab2fea87161dc08fa794a648584c4"
|
|
|
|
|
ps.SystemId = "BASE_GO"
|
|
|
|
|
|
|
|
|
|
var dsMap = make([]dataStruct, 0)
|
|
|
|
|
for k := range list {
|
|
|
|
|
var ds dataStruct
|
|
|
|
|
ds.Data, _ = CommonUtil.MapToJson(list[k])
|
|
|
|
|
ds.DataId = list[k][sqlDict[i].PrimaryKey].(string)
|
|
|
|
|
ds.DelFlag = list[k]["del_flag"].(int64)
|
|
|
|
|
ds.OrgId = list[k]["bureau_id"].(string)
|
|
|
|
|
dsMap = append(dsMap, ds)
|
|
|
|
|
}
|
|
|
|
|
ps.Datas = dsMap
|
|
|
|
|
//将Struct转为json
|
|
|
|
|
jsonBytes, _ := json.Marshal(ps)
|
|
|
|
|
msg := string(jsonBytes)
|
|
|
|
|
//提交到汇集中心
|
|
|
|
|
p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg)
|
|
|
|
|
if !p.Success {
|
|
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:"+p.Message)
|
|
|
|
|
time.Sleep(5 * 1e9)
|
|
|
|
|
success := PostToServer(sqlDict[i], list)
|
|
|
|
|
if !success {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
//记录上报数量
|
|
|
|
@ -162,16 +218,49 @@ func UploadData() int {
|
|
|
|
|
return postCount
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
功能:将数据批量上报到汇集中心
|
|
|
|
|
作者:黄海
|
|
|
|
|
时间:2020-07-17
|
|
|
|
|
*/
|
|
|
|
|
func PostToServer(t tableStruct, list []map[string]interface{}) bool {
|
|
|
|
|
//上报到Http Api--->Body--->Post
|
|
|
|
|
var ps postStruct
|
|
|
|
|
ps.DataSource = t.DataSource
|
|
|
|
|
ps.AuthToken = ConfigUtil.DataExchangeAuthToken
|
|
|
|
|
ps.SystemId = ConfigUtil.DataExchangeSystemId
|
|
|
|
|
|
|
|
|
|
var dsMap = make([]dataStruct, 0)
|
|
|
|
|
for k := range list {
|
|
|
|
|
var ds dataStruct
|
|
|
|
|
ds.Data, _ = CommonUtil.MapToJson(list[k])
|
|
|
|
|
ds.DataId = list[k][t.PrimaryKey].(string)
|
|
|
|
|
ds.DelFlag = list[k]["del_flag"].(int64)
|
|
|
|
|
ds.OrgId = list[k]["bureau_id"].(string)
|
|
|
|
|
dsMap = append(dsMap, ds)
|
|
|
|
|
}
|
|
|
|
|
ps.Datas = dsMap
|
|
|
|
|
//将Struct转为json
|
|
|
|
|
jsonBytes, _ := json.Marshal(ps)
|
|
|
|
|
msg := string(jsonBytes)
|
|
|
|
|
//提交到汇集中心
|
|
|
|
|
p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg)
|
|
|
|
|
if !p.Success{
|
|
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:" + p.Message)
|
|
|
|
|
time.Sleep(5 * 1e9)
|
|
|
|
|
}
|
|
|
|
|
return p.Success
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 功能:获取一个表的指定大小的数据
|
|
|
|
|
* 作者:黄海
|
|
|
|
|
* 时间:2019-01-16
|
|
|
|
|
*/
|
|
|
|
|
func getRecord(tableName string, lastUpdatedTime string, idInt int64) (string, int64, []map[string]interface{}) {
|
|
|
|
|
//每次获取的条数
|
|
|
|
|
limit := 100
|
|
|
|
|
//SQL内容
|
|
|
|
|
sqlPublic := FileUtil.ReadFileContent("./Sql/" + tableName + ".sql")
|
|
|
|
|
|
|
|
|
|
//时间戳相等的SQL
|
|
|
|
|
eqSql := sqlPublic + " where t1.last_updated_time =? and t1.id_int>? order by t1.last_updated_time,t1.id_int limit ?"
|
|
|
|
|
//时间戳大于的SQL
|
|
|
|
@ -231,22 +320,16 @@ func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) (
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type FailResultStruct struct {
|
|
|
|
|
FailId string `json:"fail_id"`
|
|
|
|
|
FailReason string `json:"fail_reason"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ResultStruct struct {
|
|
|
|
|
Message string `json:"message"`
|
|
|
|
|
Success bool `json:"success"`
|
|
|
|
|
FailResults []FailResultStruct `json:"fail_results"`
|
|
|
|
|
Message string `json:"message"`
|
|
|
|
|
Success bool `json:"success"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 基础方法,这里多用于访问webapi,配合上json转换。此方法可以运行但是不算完善。
|
|
|
|
|
// 基础方法,这里多用于访问webapi,配合上json转换。
|
|
|
|
|
func httpDo(method string, url string, msg string) ResultStruct {
|
|
|
|
|
var p ResultStruct
|
|
|
|
|
p.Success=false
|
|
|
|
|
p.Message="上报到汇集系统失败!"
|
|
|
|
|
p.Success = false
|
|
|
|
|
p.Message = "上报到汇集系统失败!"
|
|
|
|
|
client := &http.Client{}
|
|
|
|
|
body := bytes.NewBuffer([]byte(msg))
|
|
|
|
|
req, err := http.NewRequest(method,
|
|
|
|
|