master
huanghai 5 years ago
parent 95f8117e93
commit 27c50fef1a

@ -32,14 +32,20 @@ type tableStruct struct {
DataSource string `json:"data_source"` DataSource string `json:"data_source"`
} }
//有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 //有同步哪些表(增量),之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序
var sqlDict = []tableStruct{ var IncrSqlDict = []tableStruct{
{TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"}, {TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"},
{TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"}, {TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"},
{TableName: "t_base_teacher", PrimaryKey: "teacher_id", DataSource: "user_teacher"}, {TableName: "t_base_teacher", PrimaryKey: "teacher_id", DataSource: "user_teacher"},
{TableName: "t_base_student", PrimaryKey: "student_id", DataSource: "user_student"}, {TableName: "t_base_student", PrimaryKey: "student_id", DataSource: "user_student"},
} }
// 全量数据上报
var FullSqlDict = []tableStruct{
{TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"},
{TableName: "t_sys_dict", PrimaryKey: "dict_id", DataSource: "sys_dic"},
}
// 数据上报的结构体 // 数据上报的结构体
type postStruct struct { type postStruct struct {
AuthToken string `json:"auth_token"` AuthToken string `json:"auth_token"`
@ -71,29 +77,27 @@ func init() {
} }
/** /**
2020-07-17 2020-07-17
*/ */
func InitOrg() { func InitFull() {
for i := range FullSqlDict {
//(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报 //(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报
logName := progressFilePath + "t_base_organization.log" logName := progressFilePath + FullSqlDict[i].TableName + ".log"
//判断文件是不是存在 //判断文件是不是存在
if !FileUtil.PathExists(logName) { if !FileUtil.PathExists(logName) {
//上报组织机构,读取t_base_organization的SQL脚本
//SQL内容 //SQL内容
sql := FileUtil.ReadFileContent("./Sql/t_base_organization.sql") sql := FileUtil.ReadFileContent("./Sql/"+FullSqlDict[i].TableName+".sql")
//组织机构是需要按org_type,area_code排序
orgSql := sql + " order by t1.org_type,t1.area_code"
list, _ := db.SQL(orgSql).Query().List()
var t tableStruct var list []map[string]interface{}
for i := range sqlDict { //如果是组织机构表,那么需要变更一下查询的排序条件
if sqlDict[i].TableName == "t_base_organization" { if FullSqlDict[i].TableName=="t_base_organization"{
t = sqlDict[i] //组织机构是需要按org_type,area_code排序
break sql = sql + " order by t1.org_type,t1.area_code"
}
} }
list, _ = db.SQL(sql).Query().List()
var count = 0 var count = 0
var isFinish = false var isFinish = false
for { for {
@ -102,14 +106,14 @@ func InitOrg() {
} }
//利用切片分批次上报 //利用切片分批次上报
if len(list) > limit { if len(list) > limit {
success := PostToServer(t, list[0:limit]) //0-99不包含100 success := PostToServer(FullSqlDict[i], list[0:limit]) //0-99不包含100
if !success { if !success {
continue continue
} }
count = count + limit count = count + limit
list = list[limit:] list = list[limit:]
} else if len(list) > 0 { } else if len(list) > 0 {
success := PostToServer(t, list) success := PostToServer(FullSqlDict[i], list)
if !success { if !success {
continue continue
} }
@ -118,8 +122,10 @@ func InitOrg() {
} else { } else {
isFinish = true isFinish = true
} }
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成组织机构初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!") fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成"+FullSqlDict[i].TableName+"初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!")
} }
//对于组织机构进行特殊处理
if FullSqlDict[i].TableName=="t_base_organization"{
//记录日志 //记录日志
maxSql := sql + " order by t1.last_updated_time desc,t1.id_int desc limit 1" maxSql := sql + " order by t1.last_updated_time desc,t1.id_int desc limit 1"
var l logStruct var l logStruct
@ -136,6 +142,10 @@ func InitOrg() {
fmt.Println(err.Error()) fmt.Println(err.Error())
} }
FileUtil.WriteContent(logName, string(jsonBytes)) FileUtil.WriteContent(logName, string(jsonBytes))
}else{
FileUtil.WriteContent(logName, "is finished!")
}
}
} }
} }
@ -148,7 +158,7 @@ func DataExchange() {
//死循环上报中 //死循环上报中
for { for {
//(1)组织机构上报 //(1)组织机构上报
InitOrg() InitFull()
//(2)本轮上报的数量如果是0休息5秒后再继续上传 //(2)本轮上报的数量如果是0休息5秒后再继续上传
postCount := UploadData() postCount := UploadData()
@ -167,7 +177,7 @@ func DataExchange() {
func UploadData() int { func UploadData() int {
var postCount = 0 var postCount = 0
//遍历所有的配置节,进行循环 //遍历所有的配置节,进行循环
for i := range sqlDict { for i := range IncrSqlDict {
count := 0 count := 0
//默认的开始时间 //默认的开始时间
startUpdateTs := defaultStartTs startUpdateTs := defaultStartTs
@ -176,7 +186,7 @@ func UploadData() int {
//查询结果集 //查询结果集
var list []map[string]interface{} var list []map[string]interface{}
//表名 //表名
tableName := sqlDict[i].TableName tableName := IncrSqlDict[i].TableName
//日志文件位置 //日志文件位置
logName := progressFilePath + tableName + ".log" logName := progressFilePath + tableName + ".log"
//判断文件是不是存在 //判断文件是不是存在
@ -198,7 +208,7 @@ func UploadData() int {
break break
} }
//上报到Http Api--->Body--->Post //上报到Http Api--->Body--->Post
success := PostToServer(sqlDict[i], list) success := PostToServer(IncrSqlDict[i], list)
if !success { if !success {
continue continue
} }
@ -233,12 +243,18 @@ func PostToServer(t tableStruct, list []map[string]interface{}) bool {
ps.DataSource = t.DataSource ps.DataSource = t.DataSource
ps.AuthToken = ConfigUtil.DataExchangeAuthToken ps.AuthToken = ConfigUtil.DataExchangeAuthToken
ps.SystemId = ConfigUtil.DataExchangeSystemId ps.SystemId = ConfigUtil.DataExchangeSystemId
var dsMap = make([]dataStruct, 0) var dsMap = make([]dataStruct, 0)
for k := range list { for k := range list {
var ds dataStruct var ds dataStruct
ds.Data, _ = CommonUtil.MapToJson(list[k]) ds.Data, _ = CommonUtil.MapToJson(list[k])
switch list[k][t.PrimaryKey].(type){
case int64:
ds.DataId = CommonUtil.ConvertInt64ToString(list[k][t.PrimaryKey].(int64))
break
default:
ds.DataId = list[k][t.PrimaryKey].(string) ds.DataId = list[k][t.PrimaryKey].(string)
break
}
ds.DelFlag = list[k]["del_flag"].(int64) ds.DelFlag = list[k]["del_flag"].(int64)
ds.OrgId = list[k]["bureau_id"].(string) ds.OrgId = list[k]["bureau_id"].(string)
dsMap = append(dsMap, ds) dsMap = append(dsMap, ds)

@ -0,0 +1,13 @@
SELECT
t1.`dict_id` AS `dict_id`,
t1.`dict_kind` AS `dict_kind`,
t1.`dict_code` AS `dict_code`,
t1.`dict_value` AS `dict_value`,
t1.`dict_remark` AS `dict_remark`,
t1.`dict_parent` AS `dict_parent`,
t1.`sort_id` AS `sort_id`,
cast( t1.`b_use` AS signed ) AS `b_use`,
'-1' as bureau_id,
case t1.`b_use` when -1 then 1 else 0 end AS `del_flag`
FROM
`t_sys_dict` as t1
Loading…
Cancel
Save