|
|
package DataExchange
|
|
|
|
|
|
import (
|
|
|
"bytes"
|
|
|
"crypto/md5"
|
|
|
"dsBaseRpc/Utils/CommonUtil"
|
|
|
"dsBaseRpc/Utils/ConfigUtil"
|
|
|
"dsBaseRpc/Utils/DbUtil"
|
|
|
"dsBaseRpc/Utils/FileUtil"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"io/ioutil"
|
|
|
"net/http"
|
|
|
"os"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
var db = DbUtil.Engine
|
|
|
|
|
|
//每次获取的条数
|
|
|
var limit = 200
|
|
|
|
|
|
//默认开始时间
|
|
|
var defaultStartTs = "1970-01-01 00:00:00"
|
|
|
|
|
|
// 日志文件路径
|
|
|
var progressFilePath = "/usr/local/SyncDataLogs/"
|
|
|
|
|
|
//建立与汇集中心的主题映射关系结构
|
|
|
type tableStruct struct {
|
|
|
TableName string `json:"table_name"`
|
|
|
PrimaryKey string `json:"primary_key"`
|
|
|
DataSource string `json:"data_source"`
|
|
|
}
|
|
|
|
|
|
//有同步哪些表(增量),之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序
|
|
|
var IncrSqlDict = []tableStruct{
|
|
|
{TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"},
|
|
|
{TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"},
|
|
|
{TableName: "t_base_teacher", PrimaryKey: "teacher_id", DataSource: "user_teacher"},
|
|
|
{TableName: "t_base_student", PrimaryKey: "student_id", DataSource: "user_student"},
|
|
|
{TableName: "t_sys_loginperson_log", PrimaryKey: "id_int", DataSource: "log_login"},
|
|
|
{TableName: "t_base_teacher_org", PrimaryKey: "id", DataSource: "user_teacher_org"},
|
|
|
}
|
|
|
|
|
|
// 全量数据上报
|
|
|
var FullSqlDict = []tableStruct{
|
|
|
{TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"},
|
|
|
{TableName: "t_sys_dict", PrimaryKey: "dict_id", DataSource: "sys_dic"},
|
|
|
{TableName: "t_gov_area", PrimaryKey: "area_code", DataSource: "org_area"},
|
|
|
}
|
|
|
|
|
|
// 数据上报的结构体
|
|
|
type postStruct struct {
|
|
|
AuthToken string `json:"auth_token"`
|
|
|
DataSource string `json:"data_source"`
|
|
|
SystemId string `json:"system_id"`
|
|
|
Datas []dataStruct `json:"datas"`
|
|
|
}
|
|
|
|
|
|
type dataStruct struct {
|
|
|
Data string `json:"data"`
|
|
|
DataId string `json:"data_id"`
|
|
|
DelFlag int64 `json:"del_flag"`
|
|
|
OrgId string `json:"org_id"`
|
|
|
}
|
|
|
|
|
|
// 日志文件对应的结构体
|
|
|
type logStruct struct {
|
|
|
StartUpdateTs string `json:"start_update_ts"`
|
|
|
IdInt int64 `json:"id_int"`
|
|
|
}
|
|
|
|
|
|
//系统token
|
|
|
var SystemToken = ""
|
|
|
|
|
|
//是否成功
|
|
|
var success = false
|
|
|
|
|
|
/**
|
|
|
功能:初始化目录
|
|
|
*/
|
|
|
func init() {
|
|
|
if !FileUtil.PathExists(progressFilePath) {
|
|
|
os.MkdirAll(progressFilePath, os.ModePerm)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
功能:全量数据上报
|
|
|
作者:黄海
|
|
|
时间:2020-07-17
|
|
|
*/
|
|
|
func InitFull() {
|
|
|
for i := range FullSqlDict {
|
|
|
//(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报
|
|
|
logName := progressFilePath + FullSqlDict[i].TableName + ".log"
|
|
|
//判断文件是不是存在
|
|
|
if !FileUtil.PathExists(logName) {
|
|
|
//SQL内容
|
|
|
isql := FileUtil.ReadFileContent("./Sql/" + FullSqlDict[i].TableName + ".sql")
|
|
|
sql := isql
|
|
|
var list []map[string]interface{}
|
|
|
//如果是组织机构表,那么需要变更一下查询的排序条件
|
|
|
if FullSqlDict[i].TableName == "t_base_organization" {
|
|
|
//组织机构是需要按org_type,area_code排序
|
|
|
sql = sql + " order by t1.org_type,t1.area_code"
|
|
|
}
|
|
|
list, _ = db.SQL(sql).Query().List()
|
|
|
|
|
|
var count = 0
|
|
|
var isFinish = false
|
|
|
for {
|
|
|
if isFinish {
|
|
|
break
|
|
|
}
|
|
|
//利用切片分批次上报
|
|
|
if len(list) > limit {
|
|
|
success := PostToServer(FullSqlDict[i], list[0:limit]) //0-99不包含100
|
|
|
if !success {
|
|
|
continue
|
|
|
}
|
|
|
count = count + limit
|
|
|
list = list[limit:]
|
|
|
} else if len(list) > 0 {
|
|
|
success := PostToServer(FullSqlDict[i], list)
|
|
|
if !success {
|
|
|
continue
|
|
|
}
|
|
|
count = count + len(list)
|
|
|
isFinish = true
|
|
|
} else {
|
|
|
isFinish = true
|
|
|
}
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成" + FullSqlDict[i].TableName + "初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!")
|
|
|
}
|
|
|
//对于组织机构进行特殊处理
|
|
|
if FullSqlDict[i].TableName == "t_base_organization" {
|
|
|
//记录日志
|
|
|
maxSql := isql + " order by t1.last_updated_time desc,t1.id_int desc limit 1"
|
|
|
var l logStruct
|
|
|
l.IdInt = 0
|
|
|
l.StartUpdateTs = defaultStartTs
|
|
|
//取得最后一行的最大值
|
|
|
list, _ = db.SQL(maxSql).Query().List()
|
|
|
if len(list) > 0 {
|
|
|
l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string)
|
|
|
l.IdInt = list[len(list)-1]["id_int"].(int64)
|
|
|
}
|
|
|
jsonBytes, err := json.Marshal(l)
|
|
|
if err != nil {
|
|
|
fmt.Println(err.Error())
|
|
|
}
|
|
|
FileUtil.WriteContent(logName, string(jsonBytes))
|
|
|
} else {
|
|
|
FileUtil.WriteContent(logName, "is finished!")
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
功能:数据上报
|
|
|
作者:黄海
|
|
|
时间:2020-07-16
|
|
|
*/
|
|
|
func DataExchange() {
|
|
|
//获取系统token
|
|
|
success, SystemToken = getSystemToken()
|
|
|
if !success {
|
|
|
return
|
|
|
}
|
|
|
//死循环上报中
|
|
|
for {
|
|
|
//(1)组织机构上报
|
|
|
InitFull()
|
|
|
//(2)本轮上报的数量,如果是0,休息5秒后再继续上传
|
|
|
postCount := UploadData()
|
|
|
if postCount == 0 {
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:本轮没有可以上报的数据,将休息5秒!")
|
|
|
time.Sleep(5 * 1e9)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 功能:上报所有大于给定时间戳的数据
|
|
|
* 作者:黄海
|
|
|
* 时间:2019-01-02
|
|
|
*/
|
|
|
func UploadData() int {
|
|
|
var postCount = 0
|
|
|
//遍历所有的配置节,进行循环
|
|
|
for i := range IncrSqlDict {
|
|
|
count := 0
|
|
|
//默认的开始时间
|
|
|
startUpdateTs := defaultStartTs
|
|
|
//默认的整数主键值
|
|
|
var idInt int64 = 0
|
|
|
//查询结果集
|
|
|
var list []map[string]interface{}
|
|
|
//表名
|
|
|
tableName := IncrSqlDict[i].TableName
|
|
|
//日志文件位置
|
|
|
logName := progressFilePath + tableName + ".log"
|
|
|
//判断文件是不是存在
|
|
|
if FileUtil.PathExists(logName) {
|
|
|
//读取配置信息
|
|
|
var logstruct logStruct
|
|
|
if err := json.Unmarshal([]byte(FileUtil.ReadFileContent(logName)), &logstruct); err == nil {
|
|
|
startUpdateTs = logstruct.StartUpdateTs
|
|
|
idInt = logstruct.IdInt
|
|
|
} else {
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + " 解析JSON文件失败:" + err.Error())
|
|
|
}
|
|
|
}
|
|
|
//一直循环
|
|
|
for {
|
|
|
//获取数据
|
|
|
startUpdateTs, idInt, list = getRecord(tableName, startUpdateTs, idInt)
|
|
|
if len(list) == 0 {
|
|
|
break
|
|
|
}
|
|
|
//上报到Http Api--->Body--->Post
|
|
|
success := PostToServer(IncrSqlDict[i], list)
|
|
|
if !success {
|
|
|
continue
|
|
|
}
|
|
|
//记录上报数量
|
|
|
count = count + len(list)
|
|
|
//记录日志
|
|
|
var l logStruct
|
|
|
l.IdInt = idInt
|
|
|
l.StartUpdateTs = startUpdateTs
|
|
|
jsonBytes, err := json.Marshal(l)
|
|
|
if err != nil {
|
|
|
fmt.Println(err.Error())
|
|
|
}
|
|
|
FileUtil.WriteContent(logName, string(jsonBytes))
|
|
|
//提示信息
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) +
|
|
|
"个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt))
|
|
|
}
|
|
|
postCount = postCount + count
|
|
|
}
|
|
|
return postCount
|
|
|
}
|
|
|
|
|
|
// 获取签名用的结构体
|
|
|
type authStruct struct {
|
|
|
AuthTime string `json:"auth_time"`
|
|
|
SystemId string `json:"system_id"`
|
|
|
SystemToken string `json:"system_token"`
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
功能:获取系统token
|
|
|
作者:黄海
|
|
|
时间:2020-07-22
|
|
|
*/
|
|
|
func getSystemToken() (bool, string) {
|
|
|
//(1)计算出system_token=MD5.hash(MD5.hash(system_id+auth_time)+system_key)
|
|
|
var as authStruct
|
|
|
as.AuthTime = CommonUtil.GetCurrentTime()
|
|
|
as.SystemId = ConfigUtil.DataExchangeSystemId
|
|
|
//计算 md5
|
|
|
w := md5.New()
|
|
|
io.WriteString(w, as.SystemId+as.AuthTime)
|
|
|
//将str写入到w中
|
|
|
md5str := fmt.Sprintf("%x", w.Sum(nil))
|
|
|
w = md5.New()
|
|
|
io.WriteString(w, md5str+ConfigUtil.DataExchangeSystemKey)
|
|
|
//将str写入到w中
|
|
|
md5str = fmt.Sprintf("%x", w.Sum(nil))
|
|
|
//系统token
|
|
|
as.SystemToken = md5str
|
|
|
|
|
|
//(2)根据system_token换取authToken
|
|
|
jsonBytes, _ := json.Marshal(as)
|
|
|
|
|
|
p := httpDo("POST", ConfigUtil.DataExchangeSystemAuthUrl, string(jsonBytes))
|
|
|
if !p.Success {
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + "获取认证签名失败!")
|
|
|
return false, "获取认证签名失败!"
|
|
|
}
|
|
|
return true, p.Message
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
功能:将数据批量上报到汇集中心
|
|
|
作者:黄海
|
|
|
时间:2020-07-17
|
|
|
*/
|
|
|
func PostToServer(t tableStruct, list []map[string]interface{}) bool {
|
|
|
//上报到Http Api--->Body--->Post
|
|
|
var ps postStruct
|
|
|
ps.DataSource = t.DataSource
|
|
|
ps.AuthToken = SystemToken //p.Message中记录了authToken
|
|
|
ps.SystemId = ConfigUtil.DataExchangeSystemId
|
|
|
var dsMap = make([]dataStruct, 0)
|
|
|
for k := range list {
|
|
|
var ds dataStruct
|
|
|
ds.Data, _ = CommonUtil.MapToJson(list[k])
|
|
|
switch list[k][t.PrimaryKey].(type) {
|
|
|
case int64:
|
|
|
ds.DataId = CommonUtil.ConvertInt64ToString(list[k][t.PrimaryKey].(int64))
|
|
|
break
|
|
|
default:
|
|
|
ds.DataId = list[k][t.PrimaryKey].(string)
|
|
|
break
|
|
|
}
|
|
|
ds.DelFlag = list[k]["del_flag"].(int64)
|
|
|
//在内层中需要手动删除掉del_flag
|
|
|
delete(list[k], "del_flag")
|
|
|
|
|
|
ds.OrgId = list[k]["bureau_id"].(string)
|
|
|
//如果是市,县管理员的话,那么这里的bureau_id就是区域码,而数据交换平台只认t_base_organization表中的org_id,需要换算一下
|
|
|
if len(ds.OrgId) == 6 {
|
|
|
ds.OrgId = "-1"
|
|
|
}
|
|
|
dsMap = append(dsMap, ds)
|
|
|
}
|
|
|
ps.Datas = dsMap
|
|
|
//将Struct转为json
|
|
|
jsonBytes, _ := json.Marshal(ps)
|
|
|
msg := string(jsonBytes)
|
|
|
//提交到汇集中心
|
|
|
p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg)
|
|
|
if !p.Success {
|
|
|
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:" + p.Message)
|
|
|
time.Sleep(5 * 1e9)
|
|
|
}
|
|
|
return p.Success
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 功能:获取一个表的指定大小的数据
|
|
|
* 作者:黄海
|
|
|
* 时间:2019-01-16
|
|
|
*/
|
|
|
func getRecord(tableName string, lastUpdatedTime string, idInt int64) (string, int64, []map[string]interface{}) {
|
|
|
//SQL内容
|
|
|
sqlPublic := FileUtil.ReadFileContent("./Sql/" + tableName + ".sql")
|
|
|
|
|
|
//时间戳相等的SQL
|
|
|
eqSql := sqlPublic + " where t1.last_updated_time =? and t1.id_int>? order by t1.last_updated_time,t1.id_int limit ?"
|
|
|
//时间戳大于的SQL
|
|
|
gtSql := sqlPublic + " where t1.last_updated_time >? order by t1.last_updated_time,t1.id_int limit ?"
|
|
|
//(1)计算相等的
|
|
|
var rsEq = make([]map[string]interface{}, 0)
|
|
|
var rsGt = make([]map[string]interface{}, 0)
|
|
|
lastUpdatedTime, idInt, rsEq = getRecordEq(eqSql, lastUpdatedTime, idInt, limit)
|
|
|
|
|
|
if rsEq == nil || len(rsEq) < limit {
|
|
|
//尝试一次gt操作(组合一下)
|
|
|
lastUpdatedTime, idInt, rsGt = getRecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq))
|
|
|
if len(rsGt) > 0 {
|
|
|
//拼装一下,合并两个结果集
|
|
|
for i := range rsGt {
|
|
|
rsEq = append(rsEq, rsGt[i])
|
|
|
}
|
|
|
//更改一下
|
|
|
lastUpdatedTime = rsGt[len(rsGt)-1]["last_updated_time"].(string)
|
|
|
idInt = rsGt[len(rsGt)-1]["id_int"].(int64)
|
|
|
}
|
|
|
}
|
|
|
return lastUpdatedTime, idInt, rsEq
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
功能:获取时间戳一致的数据
|
|
|
作者:黄海
|
|
|
时间:2020-7-17
|
|
|
*/
|
|
|
func getRecordEq(eqSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) {
|
|
|
rs, _ := db.SQL(eqSql, lastUpdatedTime, idInt, limit).Query().List()
|
|
|
if rs != nil && len(rs) > 0 {
|
|
|
//取出最后一行的数据进行回写
|
|
|
lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string)
|
|
|
idInt = rs[len(rs)-1]["id_int"].(int64)
|
|
|
return lastUpdatedTime, idInt, rs
|
|
|
} else {
|
|
|
return lastUpdatedTime, idInt, nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
功能:获取大于指定时间戳的数据
|
|
|
作者:黄海
|
|
|
时间:2020-7-17
|
|
|
*/
|
|
|
func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) {
|
|
|
rs, err := db.SQL(gtSql, lastUpdatedTime, limit).Query().List()
|
|
|
if err != nil {
|
|
|
fmt.Println(err.Error())
|
|
|
}
|
|
|
if rs != nil && len(rs) > 0 {
|
|
|
//取出最后一行的数据进行回写
|
|
|
lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string)
|
|
|
idInt := rs[len(rs)-1]["id_int"].(int64)
|
|
|
return lastUpdatedTime, idInt, rs
|
|
|
} else {
|
|
|
return lastUpdatedTime, idInt, nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
type ResultStruct struct {
|
|
|
Message string `json:"message"`
|
|
|
Success bool `json:"success"`
|
|
|
}
|
|
|
|
|
|
// 基础方法,这里多用于访问webapi,配合上json转换。
|
|
|
func httpDo(method string, url string, msg string) ResultStruct {
|
|
|
var p ResultStruct
|
|
|
p.Success = false
|
|
|
p.Message = "上报到汇集系统失败!请检查是否SystemToken是有效的,或者有两个及以上客户端同时在上报。"
|
|
|
client := &http.Client{}
|
|
|
body := bytes.NewBuffer([]byte(msg))
|
|
|
req, err := http.NewRequest(method,
|
|
|
url,
|
|
|
body)
|
|
|
if err != nil {
|
|
|
// handle error
|
|
|
}
|
|
|
req.Header.Set("Content-Type", "application/json;charset=utf-8")
|
|
|
resp, err := client.Do(req)
|
|
|
if err != nil {
|
|
|
fmt.Println(err)
|
|
|
return p
|
|
|
}
|
|
|
defer resp.Body.Close()
|
|
|
resultBody, err := ioutil.ReadAll(resp.Body)
|
|
|
if err != nil {
|
|
|
fmt.Println(err)
|
|
|
return p
|
|
|
}
|
|
|
json.Unmarshal(resultBody, &p)
|
|
|
return p
|
|
|
}
|