package DataExchange import ( "bytes" "crypto/md5" "dsBaseRpc/Utils/CommonUtil" "dsBaseRpc/Utils/ConfigUtil" "dsBaseRpc/Utils/DbUtil" "dsBaseRpc/Utils/FileUtil" "encoding/json" "fmt" "io" "io/ioutil" "net/http" "os" "time" ) var db = DbUtil.Engine //每次获取的条数 var limit = 200 //默认开始时间 var defaultStartTs = "1970-01-01 00:00:00" // 日志文件路径 var progressFilePath = "/usr/local/SyncDataLogs/" //建立与汇集中心的主题映射关系结构 type tableStruct struct { TableName string `json:"table_name"` PrimaryKey string `json:"primary_key"` DataSource string `json:"data_source"` } //有同步哪些表(增量),之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序 var IncrSqlDict = []tableStruct{ {TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"}, {TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"}, {TableName: "t_base_teacher", PrimaryKey: "teacher_id", DataSource: "user_teacher"}, {TableName: "t_base_student", PrimaryKey: "student_id", DataSource: "user_student"}, {TableName: "t_sys_loginperson_log", PrimaryKey: "id_int", DataSource: "log_login"}, } // 全量数据上报 var FullSqlDict = []tableStruct{ {TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"}, {TableName: "t_sys_dict", PrimaryKey: "dict_id", DataSource: "sys_dic"}, {TableName: "t_gov_area", PrimaryKey: "area_code", DataSource: "org_area"}, } // 数据上报的结构体 type postStruct struct { AuthToken string `json:"auth_token"` DataSource string `json:"data_source"` SystemId string `json:"system_id"` Datas []dataStruct `json:"datas"` } type dataStruct struct { Data string `json:"data"` DataId string `json:"data_id"` DelFlag int64 `json:"del_flag"` OrgId string `json:"org_id"` } // 日志文件对应的结构体 type logStruct struct { StartUpdateTs string `json:"start_update_ts"` IdInt int64 `json:"id_int"` } //系统token var SystemToken="" //是否成功 var success =false /** 功能:初始化目录 */ func init() { if !FileUtil.PathExists(progressFilePath) { os.MkdirAll(progressFilePath, os.ModePerm) } } /** 功能:全量数据上报 作者:黄海 时间:2020-07-17 */ func InitFull() { for i := range FullSqlDict { //(1)是不是进行过首次上报,如果没有话,需要执行一次组织机构上报 logName := progressFilePath + FullSqlDict[i].TableName + ".log" //判断文件是不是存在 if !FileUtil.PathExists(logName) { //SQL内容 isql := FileUtil.ReadFileContent("./Sql/" + FullSqlDict[i].TableName + ".sql") sql := isql var list []map[string]interface{} //如果是组织机构表,那么需要变更一下查询的排序条件 if FullSqlDict[i].TableName == "t_base_organization" { //组织机构是需要按org_type,area_code排序 sql = sql + " order by t1.org_type,t1.area_code" } list, _ = db.SQL(sql).Query().List() var count = 0 var isFinish = false for { if isFinish { break } //利用切片分批次上报 if len(list) > limit { success := PostToServer(FullSqlDict[i], list[0:limit]) //0-99不包含100 if !success { continue } count = count + limit list = list[limit:] } else if len(list) > 0 { success := PostToServer(FullSqlDict[i], list) if !success { continue } count = count + len(list) isFinish = true } else { isFinish = true } fmt.Println(CommonUtil.GetCurrentTime() + " 同步:成功完成" + FullSqlDict[i].TableName + "初始化上报,本次完成" + CommonUtil.ConvertIntToString(count) + "条!") } //对于组织机构进行特殊处理 if FullSqlDict[i].TableName == "t_base_organization" { //记录日志 maxSql := isql + " order by t1.last_updated_time desc,t1.id_int desc limit 1" var l logStruct l.IdInt = 0 l.StartUpdateTs = defaultStartTs //取得最后一行的最大值 list, _ = db.SQL(maxSql).Query().List() if len(list) > 0 { l.StartUpdateTs = list[len(list)-1]["last_updated_time"].(string) l.IdInt = list[len(list)-1]["id_int"].(int64) } jsonBytes, err := json.Marshal(l) if err != nil { fmt.Println(err.Error()) } FileUtil.WriteContent(logName, string(jsonBytes)) } else { FileUtil.WriteContent(logName, "is finished!") } } } } /** 功能:数据上报 作者:黄海 时间:2020-07-16 */ func DataExchange() { //获取系统token success,SystemToken=getSystemToken() if !success{ return } //死循环上报中 for { //(1)组织机构上报 InitFull() //(2)本轮上报的数量,如果是0,休息5秒后再继续上传 postCount := UploadData() if postCount == 0 { fmt.Println(CommonUtil.GetCurrentTime() + " 同步:本轮没有可以上报的数据,将休息5秒!") time.Sleep(5 * 1e9) } } } /** * 功能:上报所有大于给定时间戳的数据 * 作者:黄海 * 时间:2019-01-02 */ func UploadData() int { var postCount = 0 //遍历所有的配置节,进行循环 for i := range IncrSqlDict { count := 0 //默认的开始时间 startUpdateTs := defaultStartTs //默认的整数主键值 var idInt int64 = 0 //查询结果集 var list []map[string]interface{} //表名 tableName := IncrSqlDict[i].TableName //日志文件位置 logName := progressFilePath + tableName + ".log" //判断文件是不是存在 if FileUtil.PathExists(logName) { //读取配置信息 var logstruct logStruct if err := json.Unmarshal([]byte(FileUtil.ReadFileContent(logName)), &logstruct); err == nil { startUpdateTs = logstruct.StartUpdateTs idInt = logstruct.IdInt } else { fmt.Println(CommonUtil.GetCurrentTime() + " 解析JSON文件失败:" + err.Error()) } } //一直循环 for { //获取数据 startUpdateTs, idInt, list = getRecord(tableName, startUpdateTs, idInt) if len(list) == 0 { break } //上报到Http Api--->Body--->Post success := PostToServer(IncrSqlDict[i], list) if !success { continue } //记录上报数量 count = count + len(list) //记录日志 var l logStruct l.IdInt = idInt l.StartUpdateTs = startUpdateTs jsonBytes, err := json.Marshal(l) if err != nil { fmt.Println(err.Error()) } FileUtil.WriteContent(logName, string(jsonBytes)) //提示信息 fmt.Println(CommonUtil.GetCurrentTime() + " 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) + "个,start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt)) } postCount = postCount + count } return postCount } // 获取签名用的结构体 type authStruct struct { AuthTime string `json:"auth_time"` SystemId string `json:"system_id"` SystemToken string `json:"system_token"` } /** 功能:获取系统token 作者:黄海 时间:2020-07-22 */ func getSystemToken() (bool, string) { //(1)计算出system_token=MD5.hash(MD5.hash(system_id+auth_time)+system_key) var as authStruct as.AuthTime = CommonUtil.GetCurrentTime() as.SystemId = ConfigUtil.DataExchangeSystemId //计算 md5 w := md5.New() io.WriteString(w, as.SystemId+as.AuthTime) //将str写入到w中 md5str := fmt.Sprintf("%x", w.Sum(nil)) w = md5.New() io.WriteString(w, md5str+ConfigUtil.DataExchangeSystemKey) //将str写入到w中 md5str = fmt.Sprintf("%x", w.Sum(nil)) //系统token as.SystemToken = md5str //(2)根据system_token换取authToken jsonBytes, _ := json.Marshal(as) p := httpDo("POST", ConfigUtil.DataExchangeSystemAuthUrl, string(jsonBytes)) if !p.Success { fmt.Println(CommonUtil.GetCurrentTime() + "获取认证签名失败!") return false,"获取认证签名失败!" } return true,p.Message } /** 功能:将数据批量上报到汇集中心 作者:黄海 时间:2020-07-17 */ func PostToServer(t tableStruct, list []map[string]interface{}) bool { //上报到Http Api--->Body--->Post var ps postStruct ps.DataSource = t.DataSource ps.AuthToken = SystemToken //p.Message中记录了authToken ps.SystemId = ConfigUtil.DataExchangeSystemId var dsMap = make([]dataStruct, 0) for k := range list { var ds dataStruct ds.Data, _ = CommonUtil.MapToJson(list[k]) switch list[k][t.PrimaryKey].(type) { case int64: ds.DataId = CommonUtil.ConvertInt64ToString(list[k][t.PrimaryKey].(int64)) break default: ds.DataId = list[k][t.PrimaryKey].(string) break } ds.DelFlag = list[k]["del_flag"].(int64) ds.OrgId = list[k]["bureau_id"].(string) dsMap = append(dsMap, ds) } ps.Datas = dsMap //将Struct转为json jsonBytes, _ := json.Marshal(ps) msg := string(jsonBytes) //提交到汇集中心 p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg) if !p.Success { fmt.Println(CommonUtil.GetCurrentTime() + " 同步:上报到数据汇集中心失败,将休息5秒后重试!错误原因:" + p.Message) time.Sleep(5 * 1e9) } return p.Success } /** * 功能:获取一个表的指定大小的数据 * 作者:黄海 * 时间:2019-01-16 */ func getRecord(tableName string, lastUpdatedTime string, idInt int64) (string, int64, []map[string]interface{}) { //SQL内容 sqlPublic := FileUtil.ReadFileContent("./Sql/" + tableName + ".sql") //时间戳相等的SQL eqSql := sqlPublic + " where t1.last_updated_time =? and t1.id_int>? order by t1.last_updated_time,t1.id_int limit ?" //时间戳大于的SQL gtSql := sqlPublic + " where t1.last_updated_time >? order by t1.last_updated_time,t1.id_int limit ?" //(1)计算相等的 var rsEq = make([]map[string]interface{}, 0) var rsGt = make([]map[string]interface{}, 0) lastUpdatedTime, idInt, rsEq = getRecordEq(eqSql, lastUpdatedTime, idInt, limit) if rsEq == nil || len(rsEq) < limit { //尝试一次gt操作(组合一下) lastUpdatedTime, idInt, rsGt = getRecordGt(gtSql, lastUpdatedTime, idInt, limit-len(rsEq)) if len(rsGt) > 0 { //拼装一下,合并两个结果集 for i := range rsGt { rsEq = append(rsEq, rsGt[i]) } //更改一下 lastUpdatedTime = rsGt[len(rsGt)-1]["last_updated_time"].(string) idInt = rsGt[len(rsGt)-1]["id_int"].(int64) } } return lastUpdatedTime, idInt, rsEq } /** 功能:获取时间戳一致的数据 作者:黄海 时间:2020-7-17 */ func getRecordEq(eqSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) { rs, _ := db.SQL(eqSql, lastUpdatedTime, idInt, limit).Query().List() if rs != nil && len(rs) > 0 { //取出最后一行的数据进行回写 lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) idInt = rs[len(rs)-1]["id_int"].(int64) return lastUpdatedTime, idInt, rs } else { return lastUpdatedTime, idInt, nil } } /** 功能:获取大于指定时间戳的数据 作者:黄海 时间:2020-7-17 */ func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) (string, int64, []map[string]interface{}) { rs, _ := db.SQL(gtSql, lastUpdatedTime, limit).Query().List() if rs != nil && len(rs) > 0 { //取出最后一行的数据进行回写 lastUpdatedTime = rs[len(rs)-1]["last_updated_time"].(string) idInt := rs[len(rs)-1]["id_int"].(int64) return lastUpdatedTime, idInt, rs } else { return lastUpdatedTime, idInt, nil } } type ResultStruct struct { Message string `json:"message"` Success bool `json:"success"` } // 基础方法,这里多用于访问webapi,配合上json转换。 func httpDo(method string, url string, msg string) ResultStruct { var p ResultStruct p.Success = false p.Message = "上报到汇集系统失败!请检查是否SystemToken是有效的,或者有两个及以上客户端同时在上报。" client := &http.Client{} body := bytes.NewBuffer([]byte(msg)) req, err := http.NewRequest(method, url, body) if err != nil { // handle error } req.Header.Set("Content-Type", "application/json;charset=utf-8") resp, err := client.Do(req) if err != nil { fmt.Println(err) return p } defer resp.Body.Close() resultBody, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Println(err) return p } json.Unmarshal(resultBody, &p) return p }