From 27c50fef1abe6b804706e251d96d9e11dc7dc86b Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 14:58:29 +0800 Subject: [PATCH] 'commit' --- dsBaseRpc/DataExchange/DataExchange.go | 142 ++++++++++++++----------- dsBaseRpc/Sql/t_sys_dict.sql | 13 +++ 2 files changed, 92 insertions(+), 63 deletions(-) create mode 100644 dsBaseRpc/Sql/t_sys_dict.sql diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index 20640c09..42fc8383 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -32,14 +32,20 @@ type tableStruct struct { DataSource string `json:"data_source"` } -//有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 -var sqlDict = []tableStruct{ +//有同步哪些表(增量),之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 +var IncrSqlDict = []tableStruct{ {TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"}, {TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"}, {TableName: "t_base_teacher", PrimaryKey: "teacher_id", DataSource: "user_teacher"}, {TableName: "t_base_student", PrimaryKey: "student_id", DataSource: "user_student"}, } +// 全量数据上报 +var FullSqlDict = []tableStruct{ + {TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"}, + {TableName: "t_sys_dict", PrimaryKey: "dict_id", DataSource: "sys_dic"}, +} + // 数据上报的结构体 type postStruct struct { AuthToken string `json:"auth_token"` @@ -71,71 +77,75 @@ func init() { } /** -功能:组织机构上报 +功能:全量数据上报 作者:黄海 时间:2020-07-17 */ -func InitOrg() { - //(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报 - logName := progressFilePath + "t_base_organization.log" - //判断文件是不是存在 - if !FileUtil.PathExists(logName) { - //上报组织机构,读取t_base_organization的SQL脚本 - //SQL内容 - sql := FileUtil.ReadFileContent("./Sql/t_base_organization.sql") - //组织机构是需要按org_type,area_code排序 - orgSql := sql + " order by t1.org_type,t1.area_code" - list, _ := db.SQL(orgSql).Query().List() +func InitFull() { + for i := range FullSqlDict { + //(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报 + logName := progressFilePath + FullSqlDict[i].TableName + ".log" + //判断文件是不是存在 + if !FileUtil.PathExists(logName) { + //SQL内容 + sql := FileUtil.ReadFileContent("./Sql/"+FullSqlDict[i].TableName+".sql") - var t tableStruct - for i := range sqlDict { - if sqlDict[i].TableName == "t_base_organization" { - t = sqlDict[i] - break + var list []map[string]interface{} + //如果是组织机构表,那么需要变更一下查询的排序条件 + if FullSqlDict[i].TableName=="t_base_organization"{ + //组织机构是需要按org_type,area_code排序 + sql = sql + " order by t1.org_type,t1.area_code" } - } - var count = 0 - var isFinish = false - for { - if isFinish { - break + list, _ = db.SQL(sql).Query().List() + + var count = 0 + var isFinish = false + for { + if isFinish { + break + } + //利用切片分批次上报 + if len(list) > limit { + success := PostToServer(FullSqlDict[i], list[0:limit]) //0-99不包含100 + if !success { + continue + } + count = count + limit + list = list[limit:] + } else if len(list) > 0 { + success := PostToServer(FullSqlDict[i], list) + if !success { + continue + } + count = count + len(list) + isFinish = true + } else { + isFinish = true + } + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成"+FullSqlDict[i].TableName+"初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!") } - //利用切片分批次上报 - if len(list) > limit { - success := PostToServer(t, list[0:limit]) //0-99不包含100 - if !success { - continue + //对于组织机构进行特殊处理 + if FullSqlDict[i].TableName=="t_base_organization"{ + //记录日志 + maxSql := sql + " order by t1.last_updated_time desc,t1.id_int desc limit 1" + var l logStruct + l.IdInt = 0 + l.StartUpdateTs = defaultStartTs + //取得最后一行的最大值 + list, _ = db.SQL(maxSql).Query().List() + if len(list) > 0 { + l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string) + l.IdInt = list[len(list)-1]["id_int"].(int64) } - count = count + limit - list = list[limit:] - } else if len(list) > 0 { - success := PostToServer(t, list) - if !success { - continue + jsonBytes, err := json.Marshal(l) + if err != nil { + fmt.Println(err.Error()) } - count = count + len(list) - isFinish = true - } else { - isFinish = true + FileUtil.WriteContent(logName, string(jsonBytes)) + }else{ + FileUtil.WriteContent(logName, "is finished!") } - fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成组织机构初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!") - } - //记录日志 - maxSql := sql + " order by t1.last_updated_time desc,t1.id_int desc limit 1" - var l logStruct - l.IdInt = 0 - l.StartUpdateTs = defaultStartTs - //取得最后一行的最大值 - list, _ = db.SQL(maxSql).Query().List() - if len(list) > 0 { - l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string) - l.IdInt = list[len(list)-1]["id_int"].(int64) - } - jsonBytes, err := json.Marshal(l) - if err != nil { - fmt.Println(err.Error()) } - FileUtil.WriteContent(logName, string(jsonBytes)) } } @@ -148,7 +158,7 @@ func DataExchange() { //死循环上报中 for { //(1)组织机构上报 - InitOrg() + InitFull() //(2)本轮上报的数量,如果是0,休息5秒后再继续上传 postCount := UploadData() @@ -167,7 +177,7 @@ func DataExchange() { func UploadData() int { var postCount = 0 //遍历所有的配置节,进行循环 - for i := range sqlDict { + for i := range IncrSqlDict { count := 0 //默认的开始时间 startUpdateTs := defaultStartTs @@ -176,7 +186,7 @@ func UploadData() int { //查询结果集 var list []map[string]interface{} //表名 - tableName := sqlDict[i].TableName + tableName := IncrSqlDict[i].TableName //日志文件位置 logName := progressFilePath + tableName + ".log" //判断文件是不是存在 @@ -198,7 +208,7 @@ func UploadData() int { break } //上报到Http Api--->Body--->Post - success := PostToServer(sqlDict[i], list) + success := PostToServer(IncrSqlDict[i], list) if !success { continue } @@ -233,12 +243,18 @@ func PostToServer(t tableStruct, list []map[string]interface{}) bool { ps.DataSource = t.DataSource ps.AuthToken = ConfigUtil.DataExchangeAuthToken ps.SystemId = ConfigUtil.DataExchangeSystemId - var dsMap = make([]dataStruct, 0) for k := range list { var ds dataStruct ds.Data, _ = CommonUtil.MapToJson(list[k]) - ds.DataId = list[k][t.PrimaryKey].(string) + switch list[k][t.PrimaryKey].(type){ + case int64: + ds.DataId = CommonUtil.ConvertInt64ToString(list[k][t.PrimaryKey].(int64)) + break + default: + ds.DataId = list[k][t.PrimaryKey].(string) + break + } ds.DelFlag = list[k]["del_flag"].(int64) ds.OrgId = list[k]["bureau_id"].(string) dsMap = append(dsMap, ds) diff --git a/dsBaseRpc/Sql/t_sys_dict.sql b/dsBaseRpc/Sql/t_sys_dict.sql new file mode 100644 index 00000000..627f92f4 --- /dev/null +++ b/dsBaseRpc/Sql/t_sys_dict.sql @@ -0,0 +1,13 @@ +SELECT + t1.`dict_id` AS `dict_id`, + t1.`dict_kind` AS `dict_kind`, + t1.`dict_code` AS `dict_code`, + t1.`dict_value` AS `dict_value`, + t1.`dict_remark` AS `dict_remark`, + t1.`dict_parent` AS `dict_parent`, + t1.`sort_id` AS `sort_id`, + cast( t1.`b_use` AS signed ) AS `b_use`, + '-1' as bureau_id, + case t1.`b_use` when -1 then 1 else 0 end AS `del_flag` +FROM + `t_sys_dict` as t1 \ No newline at end of file