package DataExchange import ( "bytes" "dsBaseRpc/Utils/CommonUtil" "dsBaseRpc/Utils/ConfigUtil" "dsBaseRpc/Utils/DbUtil" "dsBaseRpc/Utils/FileUtil" "encoding/json" "fmt" "io/ioutil" "net/http" "os" "time" ) var db = DbUtil.Engine //每次获取的条数 var limit = 100 // 日志文件路径 var progressFilePath = "/usr/local/SyncDataLogs/" //建立与汇集中心的主题映射关系结构 type tableStruct struct { TableName string `json:"table_name"` PrimaryKey string `json:"primary_key"` DataSource string `json:"data_source"` } //有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 var sqlDict = []tableStruct{ {TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"}, {TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"}, } // 数据上报的结构体 type postStruct struct { AuthToken string `json:"auth_token"` DataSource string `json:"data_source"` SystemId string `json:"system_id"` Datas []dataStruct `json:"datas"` } type dataStruct struct { Data string `json:"data"` DataId string `json:"data_id"` DelFlag int64 `json:"del_flag"` OrgId string `json:"org_id"` } // 日志文件对应的结构体 type logStruct struct { StartUpdateTs string `json:"start_update_ts"` IdInt int64 `json:"id_int"` } /** 功能:初始化目录 */ func init() { if !FileUtil.PathExists(progressFilePath) { os.MkdirAll(progressFilePath, os.ModePerm) } } /** 功能:数据上报 作者:黄海 时间:2020-07-16 */ func DataExchange() { //是不是进行过首次上报,如果没有话,需要执行一次组织机构上报 //日志文件位置 logName := progressFilePath + "orgInit.log" //判断文件是不是存在 if !FileUtil.PathExists(logName) { //上报组织机构,读取t_base_organization的SQL脚本 //SQL内容 sql := FileUtil.ReadFileContent("./Sql/t_base_organization.sql") //组织机构是需要按org_type,area_code排序 orgSql := sql + " order by t1.org_type,t1.area_code" list, _ := db.SQL(orgSql).Query().List() var t tableStruct for i := range sqlDict { if sqlDict[i].TableName == "t_base_organization" { t = sqlDict[i] break } } var count = 0 var isFinish = false for { if isFinish { break } //利用切片分批次上报 if len(list) > limit { success:=PostToServer(t, list[0:limit]) if !success{ continue } count = count + limit list = list[limit:] } else if len(list) > 0 { PostToServer(t, list) count = count + len(list) isFinish = true } else { isFinish = true } fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成组织机构初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!") } //记录日志,内容为1,表示组织机构初始化上报过了~ FileUtil.WriteContent(logName, "ok") } for { //本轮上报的数量,如果是0,休息5秒后再继续上传 postCount := UploadData() if postCount == 0 { fmt.Println(CommonUtil.GetCurrentTime() + " 同步:本轮没有可以上报的数据,将休息5秒!") time.Sleep(5 * 1e9) } } } /** * 功能:上报所有大于给定时间戳的数据 * 作者:黄海 * 时间:2019-01-02 */ func UploadData() int { var postCount = 0 //遍历所有的配置节,进行循环 for i := range sqlDict { count := 0 //默认的开始时间 startUpdateTs := "1970-01-01 00:00:00" //默认的整数主键值 var idInt int64 = 0 //查询结果集 var list []map[string]interface{} //表名 tableName := sqlDict[i].TableName //日志文件位置 logName := progressFilePath + tableName + ".log" //判断文件是不是存在 if FileUtil.PathExists(logName) { //读取配置信息 var logstruct logStruct if err := json.Unmarshal([]byte(FileUtil.ReadFileContent(logName)), &logstruct); err == nil { startUpdateTs = logstruct.StartUpdateTs idInt = logstruct.IdInt } else { fmt.Println(CommonUtil.GetCurrentTime() + " 解析JSON文件失败:" + err.Error()) } } //一直循环 for { //获取数据 startUpdateTs, idInt, list = getRecord(tableName, startUpdateTs, idInt) if len(list) == 0 { break } //上报到Http Api--->Body--->Post success := PostToServer(sqlDict[i], list) if !success { continue } //记录上报数量 count = count + len(list) //记录日志 var l logStruct l.IdInt = idInt l.StartUpdateTs = startUpdateTs jsonBytes, err := json.Marshal(l) if err != nil { fmt.Println(err.Error()) } FileUtil.WriteContent(logName, string(jsonBytes)) //提示信息 fmt.Println(CommonUtil.GetCurrentTime() + " 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt)) } postCount = postCount + count } return postCount } /** 功能:将数据批量上报到汇集中心 作者:黄海 时间:2020-07-17 */ func PostToServer(t tableStruct, list []map[string]interface{}) bool { //上报到Http Api--->Body--->Post var ps postStruct ps.DataSource = t.DataSource ps.AuthToken = "DSDataex_Token_eb4ab2fea87161dc08fa794a648584c4" ps.SystemId = "BASE_GO" var dsMap = make([]dataStruct, 0) for k := range list { var ds dataStruct ds.Data, _ = CommonUtil.MapToJson(list[k]) ds.DataId = list[k][t.PrimaryKey].(string) ds.DelFlag = list[k]["del_flag"].(int64) ds.OrgId = list[k]["bureau_id"].(string) dsMap = append(dsMap, ds) } ps.Datas = dsMap //将Struct转为json jsonBytes, _ := json.Marshal(ps) msg := string(jsonBytes) //提交到汇集中心 p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg) fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:" + p.Message) time.Sleep(5 * 1e9) return p.Success } /** * 功能:获取一个表的指定大小的数据 * 作者:黄海 * 时间:2019-01-16 */ func getRecord(tableName string, lastUpdatedTime string, idInt int64) (string, int64, []map[string]interface{}) { //SQL内容 sqlPublic := FileUtil.ReadFileContent("./Sql/" + tableName + ".sql") //时间戳相等的SQL eqSql := sqlPublic + " where t1.last_updated_time =? and t1.id_int>? order by t1.last_updated_time,t1.id_int limit ?" //时间戳大于的SQL gtSql := sqlPublic + " where t1.last_updated_time >? order by t1.last_updated_time,t1.id_int limit ?" //(1)计算相等的 var rsEq = make([]map[string]interface{}, 0) var rsGt = make([]map[string]interface{}, 0) lastUpdatedTime, idInt, rsEq = getRecordEq(eqSql, lastUpdatedTime, idInt, limit) if len(rsEq) < limit { //尝试一次gt操作(组合一下) lastUpdatedTime, idInt, rsGt = getRecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq)) if len(rsGt) > 0 { //拼装一下,合并两个结果集 for i := range rsGt { rsEq = append(rsEq, rsGt[i]) } //更改一下 lastUpdatedTime = rsGt[len(rsGt)-1]["last_updated_time"].(string) idInt = rsGt[len(rsGt)-1]["id_int"].(int64) } } return lastUpdatedTime, idInt, rsEq } /** 功能:获取时间戳一致的数据 作者:黄海 时间:2020-7-17 */ func getRecordEq(eqSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) { rs, _ := db.SQL(eqSql, lastUpdatedTime, idInt, limit).Query().List() if rs != nil && len(rs) > 0 { //取出最后一行的数据进行回写 lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) idInt = rs[len(rs)-1]["id_int"].(int64) return lastUpdatedTime, idInt, rs } else { return lastUpdatedTime, idInt, nil } } /** 功能:获取大于指定时间戳的数据 作者:黄海 时间:2020-7-17 */ func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) { rs, _ := db.SQL(gtSql, lastUpdatedTime, limit).Query().List() if rs != nil && len(rs) > 0 { //取出最后一行的数据进行回写 lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) idInt := rs[len(rs)-1]["id_int"].(int64) return lastUpdatedTime, idInt, rs } else { return lastUpdatedTime, idInt, nil } } type FailResultStruct struct { FailId string `json:"fail_id"` FailReason string `json:"fail_reason"` } type ResultStruct struct { Message string `json:"message"` Success bool `json:"success"` FailResults []FailResultStruct `json:"fail_results"` } // 基础方法,这里多用于访问webapi,配合上json转换。此方法可以运行但是不算完善。 func httpDo(method string, url string, msg string) ResultStruct { var p ResultStruct p.Success = false p.Message = "上报到汇集系统失败!" client := &http.Client{} body := bytes.NewBuffer([]byte(msg)) req, err := http.NewRequest(method, url, body) if err != nil { // handle error } req.Header.Set("Content-Type", "application/json;charset=utf-8") resp, err := client.Do(req) if err != nil { fmt.Println(err) return p } defer resp.Body.Close() resultBody, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println(err) return p } json.Unmarshal(resultBody, &p) return p }