From 9a553576153e28d4e7d59c4b4d1d629da0fecc40 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 17 Jul 2020 11:18:02 +0800 Subject: [PATCH] 'commit' --- dsBaseRpc/Config/Config.ini | 66 +++++++------ dsBaseRpc/DataExchange/DataExchange.go | 118 ++++++++++++++++++++--- dsBaseRpc/Utils/CommonUtil/CommonUtil.go | 23 +++++ dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go | 6 ++ 4 files changed, 169 insertions(+), 44 deletions(-) diff --git a/dsBaseRpc/Config/Config.ini b/dsBaseRpc/Config/Config.ini index 18c955e9..6db2b79d 100644 --- a/dsBaseRpc/Config/Config.ini +++ b/dsBaseRpc/Config/Config.ini @@ -1,31 +1,35 @@ -[mysql] # mysql的配置项 -ip = server.dsmin.com -port = 22066 -database = base_db_dev -user = root -pwd = DsideaL147258369 - -[distribute] #发布功能的配置 -ip = server.dsmin.com -port = 26611 -user = root -pwd = dsideal -remotePath = /usr/local/dsMin/dsBaseRpc/ -localPath = E:\Work\dsMin\dsBaseRpc\build - -[redis] -ip =server.dsmin.com -port = 18890 -db = 0 -expireTime = 86400 - -# 注册rpc server -[rpcServer] -port = 8001 - -# 本项目名称,用于记录日志 -[project] -project_name = dsBaseRpc - -[kafka] -KafkaAddress = server.dsmin.com:9092 +[mysql] # mysql的配置项 +ip = server.dsmin.com +port = 22066 +database = base_db_dev +user = root +pwd = DsideaL147258369 + +[distribute] #发布功能的配置 +ip = server.dsmin.com +port = 26611 +user = root +pwd = dsideal +remotePath = /usr/local/dsMin/dsBaseRpc/ +localPath = E:\Work\dsMin\dsBaseRpc\build + +[redis] +ip =server.dsmin.com +port = 18890 +db = 0 +expireTime = 86400 + +# 注册rpc server +[rpcServer] +port = 8001 + +# 本项目名称,用于记录日志 +[project] +project_name = dsBaseRpc + +[kafka] +KafkaAddress = server.dsmin.com:9092 + +# 数据汇集的地址 +[dataExchange] +url = http://10.10.14.239:9009/v1/dataex/DataexSet diff --git a/dsBaseRpc/DataExchange/DataExchange.go b/dsBaseRpc/DataExchange/DataExchange.go index 1bab01e4..9612c6b5 100644 --- a/dsBaseRpc/DataExchange/DataExchange.go +++ b/dsBaseRpc/DataExchange/DataExchange.go @@ -1,11 +1,15 @@ package DataExchange import ( + "bytes" "dsBaseRpc/Utils/CommonUtil" + "dsBaseRpc/Utils/ConfigUtil" "dsBaseRpc/Utils/DbUtil" "dsBaseRpc/Utils/FileUtil" "encoding/json" "fmt" + "io/ioutil" + "net/http" "os" "time" ) @@ -15,8 +19,39 @@ var db = DbUtil.Engine // 日志文件路径 var progressFilePath = "/usr/local/SyncDataLogs/" +//建立与汇集中心的主题映射关系结构 +type tableStruct struct { + TableName string `json:"table_name"` + PrimaryKey string `json:"primary_key"` + DataSource string `json:"data_source"` +} + //有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 -var sqlDict = []string{"t_base_organization", "t_base_class"} +var sqlDict = []tableStruct{ + {TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"}, + {TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"}, +} + +// 数据上报的结构体 +type postStruct struct { + AuthToken string `json:"auth_token"` + DataSource string `json:"data_source"` + SystemId string `json:"system_id"` + Datas []dataStruct `json:"datas"` +} + +type dataStruct struct { + Data string `json:"data"` + DataId string `json:"data_id"` + DelFlag int64 `json:"del_flag"` + OrgId string `json:"org_id"` +} + +// 日志文件对应的结构体 +type logStruct struct { + StartUpdateTs string `json:"start_update_ts"` + IdInt int64 `json:"id_int"` +} /** 功能:初始化目录 @@ -37,18 +72,12 @@ func DataExchange() { //本轮上报的数量,如果是0,休息5秒后再继续上传 postCount := UploadData() if postCount == 0 { - fmt.Println(CommonUtil.GetCurrentTime()+" 同步:本轮没有可以上报的数据,将休息5秒!") + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:本轮没有可以上报的数据,将休息5秒!") time.Sleep(5 * 1e9) } } } -// 日志文件对应的结构体 -type logStruct struct { - StartUpdateTs string `json:"start_update_ts"` - IdInt int64 `json:"id_int"` -} - /** * 功能:上报所有大于给定时间戳的数据 * 作者:黄海 @@ -66,7 +95,7 @@ func UploadData() int { //查询结果集 var list []map[string]interface{} //表名 - tableName := sqlDict[i] + tableName := sqlDict[i].TableName //日志文件位置 logName := progressFilePath + tableName + ".log" //判断文件是不是存在 @@ -77,7 +106,7 @@ func UploadData() int { startUpdateTs = logstruct.StartUpdateTs idInt = logstruct.IdInt } else { - fmt.Println(CommonUtil.GetCurrentTime()+" 解析JSON文件失败:" + err.Error()) + fmt.Println(CommonUtil.GetCurrentTime() + " 解析JSON文件失败:" + err.Error()) } } //一直循环 @@ -87,9 +116,32 @@ func UploadData() int { if len(list) == 0 { break } - //上报到Http Api - //TODO + //上报到Http Api--->Body--->Post + var ps postStruct + ps.DataSource = sqlDict[i].DataSource + ps.AuthToken = "DSDataex_Token_eb4ab2fea87161dc08fa794a648584c4" + ps.SystemId = "BASE_GO" + var dsMap = make([]dataStruct, 0) + for k := range list { + var ds dataStruct + ds.Data, _ = CommonUtil.MapToJson(list[k]) + ds.DataId = list[k][sqlDict[i].PrimaryKey].(string) + ds.DelFlag = list[k]["del_flag"].(int64) + ds.OrgId = list[k]["bureau_id"].(string) + dsMap = append(dsMap, ds) + } + ps.Datas = dsMap + //将Struct转为json + jsonBytes, _ := json.Marshal(ps) + msg := string(jsonBytes) + //提交到汇集中心 + p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg) + if !p.Success { + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:"+p.Message) + time.Sleep(5 * 1e9) + continue + } //记录上报数量 count = count + len(list) //记录日志 @@ -102,7 +154,7 @@ func UploadData() int { } FileUtil.WriteContent(logName, string(jsonBytes)) //提示信息 - fmt.Println(CommonUtil.GetCurrentTime()+" 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + + fmt.Println(CommonUtil.GetCurrentTime() + " 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt)) } postCount = postCount + count @@ -178,3 +230,43 @@ func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) ( return lastUpdatedTime, idInt, nil } } + +type FailResultStruct struct { + FailId string `json:"fail_id"` + FailReason string `json:"fail_reason"` +} + +type ResultStruct struct { + Message string `json:"message"` + Success bool `json:"success"` + FailResults []FailResultStruct `json:"fail_results"` +} + +// 基础方法,这里多用于访问webapi,配合上json转换。此方法可以运行但是不算完善。 +func httpDo(method string, url string, msg string) ResultStruct { + var p ResultStruct + p.Success=false + p.Message="上报到汇集系统失败!" + client := &http.Client{} + body := bytes.NewBuffer([]byte(msg)) + req, err := http.NewRequest(method, + url, + body) + if err != nil { + // handle error + } + req.Header.Set("Content-Type", "application/json;charset=utf-8") + resp, err := client.Do(req) + if err != nil { + fmt.Println(err) + return p + } + defer resp.Body.Close() + resultBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println(err) + return p + } + json.Unmarshal(resultBody, &p) + return p +} diff --git a/dsBaseRpc/Utils/CommonUtil/CommonUtil.go b/dsBaseRpc/Utils/CommonUtil/CommonUtil.go index 251d7fa0..497da7e7 100644 --- a/dsBaseRpc/Utils/CommonUtil/CommonUtil.go +++ b/dsBaseRpc/Utils/CommonUtil/CommonUtil.go @@ -631,3 +631,26 @@ func CopyFields(sourceStruct interface{}, targetStruct interface{}, fields ...st } return } +// Convert json string to map +func JsonToMap(jsonStr string) (map[string]string, error) { + m := make(map[string]string) + err := json.Unmarshal([]byte(jsonStr), &m) + if err != nil { + fmt.Printf("Unmarshal with error: %+v\n", err) + return nil, err + } + for k, v := range m { + fmt.Printf("%v: %v\n", k, v) + } + return m, nil +} + +// Convert map json string +func MapToJson(m map[string]interface{}) (string, error) { + jsonByte, err := json.Marshal(m) + if err != nil { + fmt.Printf("Marshal with error: %+v\n", err) + return "", nil + } + return string(jsonByte), nil +} diff --git a/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go b/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go index a202e585..cb947408 100644 --- a/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go +++ b/dsBaseRpc/Utils/ConfigUtil/ConfigUtil.go @@ -38,6 +38,9 @@ var ( ProjectName string //kafka地址 KafkaAddress string + + //数据汇集中心地址 + DataExchangeUrl string ) func init() { @@ -92,6 +95,9 @@ func init() { ProjectName = iniParser.GetString("project", "project_name") //kafka地址 KafkaAddress = iniParser.GetString("kafka", "KafkaAddress") + + //数据汇集中心地址 + DataExchangeUrl=iniParser.GetString("dataExchange","url") } type IniParser struct {