From bb18d1dd1b73183be0a06c9977c9eb8a35f938cd Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 09:44:02 +0800 Subject: [PATCH 1/5] 'commit' --- dsBaseRpc/DataEx/DataEx.go | 149 +++++++++++++++++++++++--- dsBaseRpc/Sql/ds_base.xml | 79 -------------- dsBaseRpc/Sql/t_base_class.sql | 28 +++++ dsBaseRpc/Sql/t_base_organization.sql | 36 +++++++ 4 files changed, 198 insertions(+), 94 deletions(-) delete mode 100644 dsBaseRpc/Sql/ds_base.xml create mode 100644 dsBaseRpc/Sql/t_base_class.sql create mode 100644 dsBaseRpc/Sql/t_base_organization.sql diff --git a/dsBaseRpc/DataEx/DataEx.go b/dsBaseRpc/DataEx/DataEx.go index daf8e51e..afd15bf5 100644 --- a/dsBaseRpc/DataEx/DataEx.go +++ b/dsBaseRpc/DataEx/DataEx.go @@ -1,15 +1,20 @@ package DataEx import ( - "dsBaseRpc/Const" + "dsBaseRpc/Utils/CommonUtil" "dsBaseRpc/Utils/DbUtil" - "dsBaseRpc/Utils/LogUtil" + "dsBaseRpc/Utils/FileUtil" + "encoding/json" "fmt" + "time" ) var db = DbUtil.Engine -//有同步哪些表 +// 日志文件路径 +var progressFilePath = "/usr/local/SyncDataLogs/" + +//有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 var sqlDict = []string{"t_base_organization", "t_base_class"} /** @@ -19,20 +24,134 @@ var sqlDict = []string{"t_base_organization", "t_base_class"} */ func DataExchange() { for { - //本轮上报的数量,如果是0,休息10秒后再继续上传 - count:=0 - //遍历所有的配置节,进行循环 - for i := range sqlDict { - //判断日志目录下的记录是不是存在? - - //存在则读取last_updated_time+id_int进行分页获取 - paramMap := map[string]interface{}{"last_updated_time": "2020-07-15 00:00:00"} - list, err := db.SqlMapClient(sqlDict[i], ¶mMap).Query().List() - if err != nil { - LogUtil.Error(Const.DataBaseActionError, "数据上报时发生了严重错误:"+err.Error()) + //本轮上报的数量,如果是0,休息5秒后再继续上传 + postCount := UploadData() + if postCount == 0 { + fmt.Println("同步:本轮没有可以上报的数据,将休息5秒!") + time.Sleep(5 * 1e9) + } + } +} + +// 日志文件对应的结构体 +type logStruct struct { + startUpdateTs string + idInt int +} + +/** + * 功能:上报所有大于给定时间戳的数据 + * 作者:黄海 + * 时间:2019-01-02 + */ +func UploadData() int { + var postCount = 0 + //遍历所有的配置节,进行循环 + for i := range sqlDict { + count := 0 + //默认的开始时间 + startUpdateTs := "1970-01-01 00:00:00" + //默认的整数主键值 + idInt := 0 + //查询结果集 + var list []map[string]interface{} + //表名 + tableName := sqlDict[i] + //日志文件位置 + logName := progressFilePath + tableName + ".log" + //判断文件是不是存在 + if FileUtil.PathExists(logName) { + //读取配置信息 + var logstruct logStruct + if err := json.Unmarshal([]byte(FileUtil.ReadFileContent(logName)), &logstruct); err == nil { + startUpdateTs = logstruct.startUpdateTs + idInt = logstruct.idInt + } else { + fmt.Println("解析JSON文件失败:" + err.Error()) + } + } + //一直循环 + for { + //获取数据 + startUpdateTs, idInt, list = getRecord(tableName, startUpdateTs, idInt) + if len(list) == 0 { break } - fmt.Println(list) + //上报到Http Api + //TODO + + //记录上报数量 + count = count + len(list) + //记录日志 + var l logStruct + l.idInt = idInt + l.startUpdateTs = startUpdateTs + jsonBytes, _ := json.Marshal(l) + FileUtil.WriteContent(logName, string(jsonBytes)) + //提示信息 + fmt.Println("同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + + "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertIntToString(idInt)) + } + postCount = postCount + count + } + return postCount +} + +/** + * 功能:获取一个表的指定大小的数据 + * 作者:黄海 + * 时间:2019-01-16 + */ +func getRecord(tableName string, lastUpdatedTime string, idInt int) (string, int, []map[string]interface{}) { + //每次获取的条数 + limit := 100 + //SQL内容 + sqlPublic := FileUtil.ReadFileContent("./Sql/" + tableName + ".sql") + //时间戳相等的SQL + eqSql := sqlPublic + " where t1.last_updated_time =? and t1.id_int>? order by t1.last_updated_time,t1.id_int limit ?" + //时间戳大于的SQL + gtSql := sqlPublic + " where t1.last_updated_time >? order by t1.last_updated_time,t1.id_int limit ?" + //(1)计算相等的 + var rsEq = make([]map[string]interface{}, 0) + var rsGt = make([]map[string]interface{}, 0) + lastUpdatedTime, idInt, rsEq = getrecordEq(eqSql, lastUpdatedTime, idInt, limit) + + if len(rsEq) < limit { + //尝试一次gt操作(组合一下) + lastUpdatedTime, idInt, rsGt = getrecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq)) + if len(rsGt) > 0 { + //拼装一下,合并两个结果集 + for i := range rsGt { + rsEq = append(rsEq, rsGt[i]) + } + //更改一下 + lastUpdatedTime = rsGt[len(rsGt)-1]["last_updated_time"].(string) + idInt = rsGt[len(rsGt)-1]["id_int"].(int) } } + return lastUpdatedTime, idInt, rsEq +} + +func getrecordEq(eqSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) { + rs, _ := db.SQL(eqSql, lastUpdatedTime, idInt, limit).Query().List() + if rs != nil && len(rs) > 0 { + //取出最后一行的数据进行回写 + lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) + idInt = rs[len(rs)-1]["id_int"].(int) + return lastUpdatedTime, idInt, rs + } else { + return lastUpdatedTime, idInt, nil + } +} + +func getrecordGt(gtSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) { + rs, _ := db.SQL(gtSql, lastUpdatedTime, limit).Query().List() + if rs != nil && len(rs) > 0 { + //取出最后一行的数据进行回写 + lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) + idInt := rs[len(rs)-1]["id_int"].(int) + return lastUpdatedTime, idInt, rs + } else { + return lastUpdatedTime, idInt, nil + } } diff --git a/dsBaseRpc/Sql/ds_base.xml b/dsBaseRpc/Sql/ds_base.xml deleted file mode 100644 index 6016d265..00000000 --- a/dsBaseRpc/Sql/ds_base.xml +++ /dev/null @@ -1,79 +0,0 @@ - - - SELECT - cast( `t_base_organization`.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`, - `t_base_organization`.`id_int` AS `id_int`, - `t_base_organization`.`org_code` AS `org_code`, - `t_base_organization`.`org_name` AS `org_name`, - cast( `t_base_organization`.`parent_id` AS CHAR ( 36 ) charset utf8 ) AS `parent_id`, - cast( `t_base_organization`.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`, - `t_base_organization`.`org_type` AS `org_type`, - `t_base_organization`.`edu_assist_type` AS `edu_assist_type`, - `t_base_organization`.`main_school_type` AS `main_school_type`, - cast( `t_base_organization`.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`, - cast( `t_base_organization`.`manage_org_id` AS CHAR ( 36 ) charset utf8 ) AS `manage_org_id`, - `t_base_organization`.`directly_under_type` AS `directly_under_type`, - `t_base_organization`.`xxbbm` AS `xxbbm`, - `t_base_organization`.`xxbxlxm` AS `xxbxlxm`, - `t_base_organization`.`szdcxlxm` AS `szdcxlxm`, - `t_base_organization`.`xxjbzm` AS `xxjbzm`, - cast( `t_base_organization`.`fzr` AS CHAR ( 36 ) charset utf8 ) AS `fzr`, - `t_base_organization`.`fddbr` AS `fddbr`, - `t_base_organization`.`fddbrdh` AS `fddbrdh`, - `t_base_organization`.`address` AS `address`, - `t_base_organization`.`lxdh` AS `lxdh`, - `t_base_organization`.`org_lng` AS `org_lng`, - `t_base_organization`.`org_lat` AS `org_lat`, - `t_base_organization`.`sort_id` AS `sort_id`, - cast( `t_base_organization`.`b_use` AS signed ) AS `b_use`, - `t_base_organization`.`province_code` AS `province_code`, - `t_base_organization`.`city_code` AS `city_code`, - `t_base_organization`.`district_code` AS `district_code`, - `t_base_organization`.`area_code` AS `area_code`, - `t_base_organization`.`last_updated_time` AS `last_updated_time`, - DATE_FORMAT(`t_base_organization`.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, - case `t_base_organization`.`b_use` when -1 then 1 else 0 end AS `del_flag` - FROM - `t_base_organization` - WHERE - `t_base_organization`.`last_updated_time`>?last_updated_time - ORDER BY - `t_base_organization`.`last_updated_time` - - - - SELECT - cast( `t_base_class`.`class_id` AS CHAR ( 36 ) charset utf8 ) AS `class_id`, - `t_base_class`.`id_int` AS `id_int`, - `t_base_class`.`bh` AS `bh`, - `t_base_class`.`class_code` AS `class_code`, - `t_base_class`.`class_name` AS `class_name`, - `t_base_class`.`class_alias` AS `class_alias`, - `t_base_class`.`rxnf` AS `rxnf`, - `t_base_class`.`rxjj` AS `rxjj`, - `t_base_class`.`schooling_length` AS `schooling_length`, - `t_base_class`.`stage_id` AS `stage_id`, - cast( `t_base_class`.`teacher_id` AS CHAR ( 36 ) charset utf8 ) AS `teacher_id`, - cast( `t_base_class`.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`, - cast( `t_base_class`.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`, - cast( `t_base_class`.`b_use` AS signed ) AS `b_use`, - `t_base_class`.`province_code` AS `province_code`, - `t_base_class`.`city_code` AS `city_code`, - `t_base_class`.`district_code` AS `district_code`, - cast( `t_base_class`.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`, - DATE_FORMAT(`t_base_class`.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, - `t_base_class`.`last_updated_time` AS `last_updated_time`, - `t_dm_stage`.`stage_name` AS `stage_name`, - `t_base_organization`.`org_name` AS `org_name` , - case `t_base_class`.`b_use` when -1 then 1 else 0 end AS `del_flag` - FROM - (( - `t_base_class` - JOIN `t_dm_stage` ON ( `t_base_class`.`stage_id` = `t_dm_stage`.`stage_id` )) - JOIN `t_base_organization` ON ( `t_base_class`.`bureau_id` = `t_base_organization`.`org_id` )) - WHERE - `t_base_class`.`last_updated_time`>?last_updated_time - ORDER BY - `t_base_class`.`last_updated_time` - - diff --git a/dsBaseRpc/Sql/t_base_class.sql b/dsBaseRpc/Sql/t_base_class.sql new file mode 100644 index 00000000..c5261d14 --- /dev/null +++ b/dsBaseRpc/Sql/t_base_class.sql @@ -0,0 +1,28 @@ +SELECT + cast( t1.`class_id` AS CHAR ( 36 ) charset utf8 ) AS `class_id`, + t1.`id_int` AS `id_int`, + t1.`bh` AS `bh`, + t1.`class_code` AS `class_code`, + t1.`class_name` AS `class_name`, + t1.`class_alias` AS `class_alias`, + t1.`rxnf` AS `rxnf`, + t1.`rxjj` AS `rxjj`, + t1.`schooling_length` AS `schooling_length`, + t1.`stage_id` AS `stage_id`, + cast( t1.`teacher_id` AS CHAR ( 36 ) charset utf8 ) AS `teacher_id`, + cast( t1.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`, + cast( t1.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`, + cast( t1.`b_use` AS signed ) AS `b_use`, + t1.`province_code` AS `province_code`, + t1.`city_code` AS `city_code`, + t1.`district_code` AS `district_code`, + cast( t1.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`, + DATE_FORMAT(t1.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, + t1.`last_updated_time` AS `last_updated_time`, + `t_dm_stage`.`stage_name` AS `stage_name`, + `t_base_organization`.`org_name` AS `org_name` , + case t1.`b_use` when -1 then 1 else 0 end AS `del_flag` + FROM + ((t_base_class as t1 JOIN `t_dm_stage` ON ( t1.`stage_id` = `t_dm_stage`.`stage_id` )) + JOIN `t_base_organization` ON ( t1.`bureau_id` = `t_base_organization`.`org_id` )) + \ No newline at end of file diff --git a/dsBaseRpc/Sql/t_base_organization.sql b/dsBaseRpc/Sql/t_base_organization.sql new file mode 100644 index 00000000..3533c896 --- /dev/null +++ b/dsBaseRpc/Sql/t_base_organization.sql @@ -0,0 +1,36 @@ +SELECT + cast( t1.`org_id` AS CHAR ( 36 ) charset utf8 ) AS `org_id`, + t1.`id_int` AS `id_int`, + t1.`org_code` AS `org_code`, + t1.`org_name` AS `org_name`, + cast( t1.`parent_id` AS CHAR ( 36 ) charset utf8 ) AS `parent_id`, + cast( t1.`bureau_id` AS CHAR ( 36 ) charset utf8 ) AS `bureau_id`, + t1.`org_type` AS `org_type`, + t1.`edu_assist_type` AS `edu_assist_type`, + t1.`main_school_type` AS `main_school_type`, + cast( t1.`main_school_id` AS CHAR ( 36 ) charset utf8 ) AS `main_school_id`, + cast( t1.`manage_org_id` AS CHAR ( 36 ) charset utf8 ) AS `manage_org_id`, + t1.`directly_under_type` AS `directly_under_type`, + t1.`xxbbm` AS `xxbbm`, + t1.`xxbxlxm` AS `xxbxlxm`, + t1.`szdcxlxm` AS `szdcxlxm`, + t1.`xxjbzm` AS `xxjbzm`, + cast( t1.`fzr` AS CHAR ( 36 ) charset utf8 ) AS `fzr`, + t1.`fddbr` AS `fddbr`, + t1.`fddbrdh` AS `fddbrdh`, + t1.`address` AS `address`, + t1.`lxdh` AS `lxdh`, + t1.`org_lng` AS `org_lng`, + t1.`org_lat` AS `org_lat`, + t1.`sort_id` AS `sort_id`, + cast( t1.`b_use` AS signed ) AS `b_use`, + t1.`province_code` AS `province_code`, + t1.`city_code` AS `city_code`, + t1.`district_code` AS `district_code`, + t1.`area_code` AS `area_code`, + t1.`last_updated_time` AS `last_updated_time`, + DATE_FORMAT(t1.`create_time`,'%Y/%m/%d %H:%i:%s') AS `create_time`, + case t1.`b_use` when -1 then 1 else 0 end AS `del_flag` + FROM + `t_base_organization` as t1 + From 64789038a6dd6274950ac508bb57df6508cb8f8c Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 10:01:16 +0800 Subject: [PATCH 2/5] 'commit' --- dsBaseRpc/DataEx/DataEx.go | 57 ++++++++++++++++++++++--------- dsBaseRpc/t_base_organization.log | 1 + 2 files changed, 41 insertions(+), 17 deletions(-) create mode 100644 dsBaseRpc/t_base_organization.log diff --git a/dsBaseRpc/DataEx/DataEx.go b/dsBaseRpc/DataEx/DataEx.go index afd15bf5..6cfe3d3b 100644 --- a/dsBaseRpc/DataEx/DataEx.go +++ b/dsBaseRpc/DataEx/DataEx.go @@ -6,6 +6,7 @@ import ( "dsBaseRpc/Utils/FileUtil" "encoding/json" "fmt" + "os" "time" ) @@ -17,6 +18,15 @@ var progressFilePath = "/usr/local/SyncDataLogs/" //有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 var sqlDict = []string{"t_base_organization", "t_base_class"} +/** +功能:初始化目录 +*/ +func init() { + if !FileUtil.PathExists(progressFilePath) { + os.MkdirAll(progressFilePath, os.ModePerm) + } +} + /** 功能:数据上报 作者:黄海 @@ -35,8 +45,8 @@ func DataExchange() { // 日志文件对应的结构体 type logStruct struct { - startUpdateTs string - idInt int + StartUpdateTs string `json:"start_update_ts"` + IdInt int64 `json:"id_int"` } /** @@ -52,7 +62,7 @@ func UploadData() int { //默认的开始时间 startUpdateTs := "1970-01-01 00:00:00" //默认的整数主键值 - idInt := 0 + var idInt int64 = 0 //查询结果集 var list []map[string]interface{} //表名 @@ -64,8 +74,8 @@ func UploadData() int { //读取配置信息 var logstruct logStruct if err := json.Unmarshal([]byte(FileUtil.ReadFileContent(logName)), &logstruct); err == nil { - startUpdateTs = logstruct.startUpdateTs - idInt = logstruct.idInt + startUpdateTs = logstruct.StartUpdateTs + idInt = logstruct.IdInt } else { fmt.Println("解析JSON文件失败:" + err.Error()) } @@ -84,13 +94,16 @@ func UploadData() int { count = count + len(list) //记录日志 var l logStruct - l.idInt = idInt - l.startUpdateTs = startUpdateTs - jsonBytes, _ := json.Marshal(l) + l.IdInt = idInt + l.StartUpdateTs = startUpdateTs + jsonBytes, err := json.Marshal(l) + if err != nil { + fmt.Println(err.Error()) + } FileUtil.WriteContent(logName, string(jsonBytes)) //提示信息 fmt.Println("同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + - "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertIntToString(idInt)) + "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt)) } postCount = postCount + count } @@ -102,7 +115,7 @@ func UploadData() int { * 作者:黄海 * 时间:2019-01-16 */ -func getRecord(tableName string, lastUpdatedTime string, idInt int) (string, int, []map[string]interface{}) { +func getRecord(tableName string, lastUpdatedTime string, idInt int64) (string, int64, []map[string]interface{}) { //每次获取的条数 limit := 100 //SQL内容 @@ -114,11 +127,11 @@ func getRecord(tableName string, lastUpdatedTime string, idInt int) (string, int //(1)计算相等的 var rsEq = make([]map[string]interface{}, 0) var rsGt = make([]map[string]interface{}, 0) - lastUpdatedTime, idInt, rsEq = getrecordEq(eqSql, lastUpdatedTime, idInt, limit) + lastUpdatedTime, idInt, rsEq = getRecordEq(eqSql, lastUpdatedTime, idInt, limit) if len(rsEq) < limit { //尝试一次gt操作(组合一下) - lastUpdatedTime, idInt, rsGt = getrecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq)) + lastUpdatedTime, idInt, rsGt = getRecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq)) if len(rsGt) > 0 { //拼装一下,合并两个结果集 for i := range rsGt { @@ -126,30 +139,40 @@ func getRecord(tableName string, lastUpdatedTime string, idInt int) (string, int } //更改一下 lastUpdatedTime = rsGt[len(rsGt)-1]["last_updated_time"].(string) - idInt = rsGt[len(rsGt)-1]["id_int"].(int) + idInt = rsGt[len(rsGt)-1]["id_int"].(int64) } } return lastUpdatedTime, idInt, rsEq } -func getrecordEq(eqSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) { +/** +功能:获取时间戳一致的数据 +作者:黄海 +时间:2020-7-17 +*/ +func getRecordEq(eqSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) { rs, _ := db.SQL(eqSql, lastUpdatedTime, idInt, limit).Query().List() if rs != nil && len(rs) > 0 { //取出最后一行的数据进行回写 lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) - idInt = rs[len(rs)-1]["id_int"].(int) + idInt = rs[len(rs)-1]["id_int"].(int64) return lastUpdatedTime, idInt, rs } else { return lastUpdatedTime, idInt, nil } } -func getrecordGt(gtSql string, lastUpdatedTime string, idInt int, limit int) (string, int, []map[string]interface{}) { +/** +功能:获取大于指定时间戳的数据 +作者:黄海 +时间:2020-7-17 +*/ +func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) { rs, _ := db.SQL(gtSql, lastUpdatedTime, limit).Query().List() if rs != nil && len(rs) > 0 { //取出最后一行的数据进行回写 lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) - idInt := rs[len(rs)-1]["id_int"].(int) + idInt := rs[len(rs)-1]["id_int"].(int64) return lastUpdatedTime, idInt, rs } else { return lastUpdatedTime, idInt, nil diff --git a/dsBaseRpc/t_base_organization.log b/dsBaseRpc/t_base_organization.log new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/dsBaseRpc/t_base_organization.log @@ -0,0 +1 @@ +{} \ No newline at end of file From 417166b3a2d0571c26f0cd7b8d0db50dc6297ebc Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 10:02:54 +0800 Subject: [PATCH 3/5] 'commit' --- .../{DataEx/DataEx.go => DataExchange/DataExchange.go} | 2 +- dsBaseRpc/main.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) rename dsBaseRpc/{DataEx/DataEx.go => DataExchange/DataExchange.go} (99%) diff --git a/dsBaseRpc/DataEx/DataEx.go b/dsBaseRpc/DataExchange/DataExchange.go similarity index 99% rename from dsBaseRpc/DataEx/DataEx.go rename to dsBaseRpc/DataExchange/DataExchange.go index 6cfe3d3b..9d212275 100644 --- a/dsBaseRpc/DataEx/DataEx.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -1,4 +1,4 @@ -package DataEx +package DataExchange import ( "dsBaseRpc/Utils/CommonUtil" diff --git a/dsBaseRpc/main.go b/dsBaseRpc/main.go index eaab87ba..e95834b7 100644 --- a/dsBaseRpc/main.go +++ b/dsBaseRpc/main.go @@ -1,7 +1,7 @@ package main import ( - "dsBaseRpc/DataEx" + "dsBaseRpc/DataExchange" "dsBaseRpc/RpcService/BaseClass/BaseClassProto" "dsBaseRpc/RpcService/BaseClass/BaseClassService" "dsBaseRpc/RpcService/BaseGlobal/BaseGlobalProto" @@ -99,12 +99,12 @@ func main() { //五、开启一个数据上报的协程 go func() { - DataEx.DataExchange() + DataExchange.DataExchange() }() //六、 注册反射服务 这个服务是CLI使用的 跟服务本身没有关系 reflection.Register(s) //七、启动 - fmt.Printf("服务发布成功,端口:%s", ConfigUtil.RpcServerPort) + fmt.Printf("服务发布成功,端口:%s\n", ConfigUtil.RpcServerPort) if err := s.Serve(lis); err != nil { fmt.Printf("failed to serve: %v\n", err) } From 3902d55295e749ce8cda6762d083076ab27706ef Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 10:04:54 +0800 Subject: [PATCH 4/5] 'commit' --- dsBaseRpc/DataExchange/DataExchange.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index 9d212275..1bab01e4 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -37,7 +37,7 @@ func DataExchange() { //本轮上报的数量,如果是0,休息5秒后再继续上传 postCount := UploadData() if postCount == 0 { - fmt.Println("同步:本轮没有可以上报的数据,将休息5秒!") + fmt.Println(CommonUtil.GetCurrentTime()+" 同步:本轮没有可以上报的数据,将休息5秒!") time.Sleep(5 * 1e9) } } @@ -77,7 +77,7 @@ func UploadData() int { startUpdateTs = logstruct.StartUpdateTs idInt = logstruct.IdInt } else { - fmt.Println("解析JSON文件失败:" + err.Error()) + fmt.Println(CommonUtil.GetCurrentTime()+" 解析JSON文件失败:" + err.Error()) } } //一直循环 @@ -102,7 +102,7 @@ func UploadData() int { } FileUtil.WriteContent(logName, string(jsonBytes)) //提示信息 - fmt.Println("同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + + fmt.Println(CommonUtil.GetCurrentTime()+" 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt)) } postCount = postCount + count From 9a553576153e28d4e7d59c4b4d1d629da0fecc40 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 11:18:02 +0800 Subject: [PATCH 5/5] 'commit' --- dsBaseRpc/Config/Config.ini | 66 +++++++------ dsBaseRpc/DataExchange/DataExchange.go | 118 ++++++++++++++++++++--- dsBaseRpc/Utils/CommonUtil/CommonUtil.go | 23 +++++ dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go | 6 ++ 4 files changed, 169 insertions(+), 44 deletions(-) diff --git a/dsBaseRpc/Config/Config.ini b/dsBaseRpc/Config/Config.ini index 18c955e9..6db2b79d 100644 --- a/dsBaseRpc/Config/Config.ini +++ b/dsBaseRpc/Config/Config.ini @@ -1,31 +1,35 @@ -[mysql] # mysql的配置项 -ip = server.dsmin.com -port = 22066 -database = base_db_dev -user = root -pwd = DsideaL147258369 - -[distribute] #发布功能的配置 -ip = server.dsmin.com -port = 26611 -user = root -pwd = dsideal -remotePath = /usr/local/dsMin/dsBaseRpc/ -localPath = E:\Work\dsMin\dsBaseRpc\build - -[redis] -ip =server.dsmin.com -port = 18890 -db = 0 -expireTime = 86400 - -# 注册rpc server -[rpcServer] -port = 8001 - -# 本项目名称,用于记录日志 -[project] -project_name = dsBaseRpc - -[kafka] -KafkaAddress = server.dsmin.com:9092 +[mysql] # mysql的配置项 +ip = server.dsmin.com +port = 22066 +database = base_db_dev +user = root +pwd = DsideaL147258369 + +[distribute] #发布功能的配置 +ip = server.dsmin.com +port = 26611 +user = root +pwd = dsideal +remotePath = /usr/local/dsMin/dsBaseRpc/ +localPath = E:\Work\dsMin\dsBaseRpc\build + +[redis] +ip =server.dsmin.com +port = 18890 +db = 0 +expireTime = 86400 + +# 注册rpc server +[rpcServer] +port = 8001 + +# 本项目名称,用于记录日志 +[project] +project_name = dsBaseRpc + +[kafka] +KafkaAddress = server.dsmin.com:9092 + +# 数据汇集的地址 +[dataExchange] +url = http://10.10.14.239:9009/v1/dataex/DataexSet diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index 1bab01e4..9612c6b5 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -1,11 +1,15 @@ package DataExchange import ( + "bytes" "dsBaseRpc/Utils/CommonUtil" + "dsBaseRpc/Utils/ConfigUtil" "dsBaseRpc/Utils/DbUtil" "dsBaseRpc/Utils/FileUtil" "encoding/json" "fmt" + "io/ioutil" + "net/http" "os" "time" ) @@ -15,8 +19,39 @@ var db = DbUtil.Engine // 日志文件路径 var progressFilePath = "/usr/local/SyncDataLogs/" +//建立与汇集中心的主题映射关系结构 +type tableStruct struct { + TableName string `json:"table_name"` + PrimaryKey string `json:"primary_key"` + DataSource string `json:"data_source"` +} + //有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 -var sqlDict = []string{"t_base_organization", "t_base_class"} +var sqlDict = []tableStruct{ + {TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"}, + {TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"}, +} + +// 数据上报的结构体 +type postStruct struct { + AuthToken string `json:"auth_token"` + DataSource string `json:"data_source"` + SystemId string `json:"system_id"` + Datas []dataStruct `json:"datas"` +} + +type dataStruct struct { + Data string `json:"data"` + DataId string `json:"data_id"` + DelFlag int64 `json:"del_flag"` + OrgId string `json:"org_id"` +} + +// 日志文件对应的结构体 +type logStruct struct { + StartUpdateTs string `json:"start_update_ts"` + IdInt int64 `json:"id_int"` +} /** 功能:初始化目录 @@ -37,18 +72,12 @@ func DataExchange() { //本轮上报的数量,如果是0,休息5秒后再继续上传 postCount := UploadData() if postCount == 0 { - fmt.Println(CommonUtil.GetCurrentTime()+" 同步:本轮没有可以上报的数据,将休息5秒!") + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:本轮没有可以上报的数据,将休息5秒!") time.Sleep(5 * 1e9) } } } -// 日志文件对应的结构体 -type logStruct struct { - StartUpdateTs string `json:"start_update_ts"` - IdInt int64 `json:"id_int"` -} - /** * 功能:上报所有大于给定时间戳的数据 * 作者:黄海 @@ -66,7 +95,7 @@ func UploadData() int { //查询结果集 var list []map[string]interface{} //表名 - tableName := sqlDict[i] + tableName := sqlDict[i].TableName //日志文件位置 logName := progressFilePath + tableName + ".log" //判断文件是不是存在 @@ -77,7 +106,7 @@ func UploadData() int { startUpdateTs = logstruct.StartUpdateTs idInt = logstruct.IdInt } else { - fmt.Println(CommonUtil.GetCurrentTime()+" 解析JSON文件失败:" + err.Error()) + fmt.Println(CommonUtil.GetCurrentTime() + " 解析JSON文件失败:" + err.Error()) } } //一直循环 @@ -87,9 +116,32 @@ func UploadData() int { if len(list) == 0 { break } - //上报到Http Api - //TODO + //上报到Http Api--->Body--->Post + var ps postStruct + ps.DataSource = sqlDict[i].DataSource + ps.AuthToken = "DSDataex_Token_eb4ab2fea87161dc08fa794a648584c4" + ps.SystemId = "BASE_GO" + var dsMap = make([]dataStruct, 0) + for k := range list { + var ds dataStruct + ds.Data, _ = CommonUtil.MapToJson(list[k]) + ds.DataId = list[k][sqlDict[i].PrimaryKey].(string) + ds.DelFlag = list[k]["del_flag"].(int64) + ds.OrgId = list[k]["bureau_id"].(string) + dsMap = append(dsMap, ds) + } + ps.Datas = dsMap + //将Struct转为json + jsonBytes, _ := json.Marshal(ps) + msg := string(jsonBytes) + //提交到汇集中心 + p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg) + if !p.Success { + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:"+p.Message) + time.Sleep(5 * 1e9) + continue + } //记录上报数量 count = count + len(list) //记录日志 @@ -102,7 +154,7 @@ func UploadData() int { } FileUtil.WriteContent(logName, string(jsonBytes)) //提示信息 - fmt.Println(CommonUtil.GetCurrentTime()+" 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt)) } postCount = postCount + count @@ -178,3 +230,43 @@ func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) ( return lastUpdatedTime, idInt, nil } } + +type FailResultStruct struct { + FailId string `json:"fail_id"` + FailReason string `json:"fail_reason"` +} + +type ResultStruct struct { + Message string `json:"message"` + Success bool `json:"success"` + FailResults []FailResultStruct `json:"fail_results"` +} + +// 基础方法,这里多用于访问webapi,配合上json转换。此方法可以运行但是不算完善。 +func httpDo(method string, url string, msg string) ResultStruct { + var p ResultStruct + p.Success=false + p.Message="上报到汇集系统失败!" + client := &http.Client{} + body := bytes.NewBuffer([]byte(msg)) + req, err := http.NewRequest(method, + url, + body) + if err != nil { + // handle error + } + req.Header.Set("Content-Type", "application/json;charset=utf-8") + resp, err := client.Do(req) + if err != nil { + fmt.Println(err) + return p + } + defer resp.Body.Close() + resultBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println(err) + return p + } + json.Unmarshal(resultBody, &p) + return p +} diff --git a/dsBaseRpc/Utils/CommonUtil/CommonUtil.go b/dsBaseRpc/Utils/CommonUtil/CommonUtil.go index 251d7fa0..497da7e7 100644 --- a/dsBaseRpc/Utils/CommonUtil/CommonUtil.go +++ b/dsBaseRpc/Utils/CommonUtil/CommonUtil.go @@ -631,3 +631,26 @@ func CopyFields(sourceStruct interface{}, targetStruct interface{}, fields ...st } return } +// Convert json string to map +func JsonToMap(jsonStr string) (map[string]string, error) { + m := make(map[string]string) + err := json.Unmarshal([]byte(jsonStr), &m) + if err != nil { + fmt.Printf("Unmarshal with error: %+v\n", err) + return nil, err + } + for k, v := range m { + fmt.Printf("%v: %v\n", k, v) + } + return m, nil +} + +// Convert map json string +func MapToJson(m map[string]interface{}) (string, error) { + jsonByte, err := json.Marshal(m) + if err != nil { + fmt.Printf("Marshal with error: %+v\n", err) + return "", nil + } + return string(jsonByte), nil +} diff --git a/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go b/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go index a202e585..cb947408 100644 --- a/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go +++ b/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go @@ -38,6 +38,9 @@ var ( ProjectName string //kafka地址 KafkaAddress string + + //数据汇集中心地址 + DataExchangeUrl string ) func init() { @@ -92,6 +95,9 @@ func init() { ProjectName = iniParser.GetString("project", "project_name") //kafka地址 KafkaAddress = iniParser.GetString("kafka", "KafkaAddress") + + //数据汇集中心地址 + DataExchangeUrl=iniParser.GetString("dataExchange","url") } type IniParser struct {