master
huanghai 5 years ago
parent 3902d55295
commit 9a55357615

@ -1,31 +1,35 @@
[mysql] # mysql的配置项
ip = server.dsmin.com
port = 22066
database = base_db_dev
user = root
pwd = DsideaL147258369
[distribute] #发布功能的配置
ip = server.dsmin.com
port = 26611
user = root
pwd = dsideal
remotePath = /usr/local/dsMin/dsBaseRpc/
localPath = E:\Work\dsMin\dsBaseRpc\build
[redis]
ip =server.dsmin.com
port = 18890
db = 0
expireTime = 86400
# 注册rpc server
[rpcServer]
port = 8001
# 本项目名称,用于记录日志
[project]
project_name = dsBaseRpc
[kafka]
KafkaAddress = server.dsmin.com:9092
[mysql] # mysql的配置项
ip = server.dsmin.com
port = 22066
database = base_db_dev
user = root
pwd = DsideaL147258369
[distribute] #发布功能的配置
ip = server.dsmin.com
port = 26611
user = root
pwd = dsideal
remotePath = /usr/local/dsMin/dsBaseRpc/
localPath = E:\Work\dsMin\dsBaseRpc\build
[redis]
ip =server.dsmin.com
port = 18890
db = 0
expireTime = 86400
# 注册rpc server
[rpcServer]
port = 8001
# 本项目名称,用于记录日志
[project]
project_name = dsBaseRpc
[kafka]
KafkaAddress = server.dsmin.com:9092
# 数据汇集的地址
[dataExchange]
url = http://10.10.14.239:9009/v1/dataex/DataexSet

@ -1,11 +1,15 @@
package DataExchange
import (
"bytes"
"dsBaseRpc/Utils/CommonUtil"
"dsBaseRpc/Utils/ConfigUtil"
"dsBaseRpc/Utils/DbUtil"
"dsBaseRpc/Utils/FileUtil"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"time"
)
@ -15,8 +19,39 @@ var db = DbUtil.Engine
// 日志文件路径
var progressFilePath = "/usr/local/SyncDataLogs/"
//建立与汇集中心的主题映射关系结构
type tableStruct struct {
TableName string `json:"table_name"`
PrimaryKey string `json:"primary_key"`
DataSource string `json:"data_source"`
}
//有同步哪些表,之所以不遍历文件的名称进行上报,是因为需要控制上传的顺序,如果只是文件名,就丢失了顺序
var sqlDict = []string{"t_base_organization", "t_base_class"}
var sqlDict = []tableStruct{
{TableName: "t_base_organization", PrimaryKey: "org_id", DataSource: "org_school"},
{TableName: "t_base_class", PrimaryKey: "class_id", DataSource: "org_class"},
}
// 数据上报的结构体
type postStruct struct {
AuthToken string `json:"auth_token"`
DataSource string `json:"data_source"`
SystemId string `json:"system_id"`
Datas []dataStruct `json:"datas"`
}
type dataStruct struct {
Data string `json:"data"`
DataId string `json:"data_id"`
DelFlag int64 `json:"del_flag"`
OrgId string `json:"org_id"`
}
// 日志文件对应的结构体
type logStruct struct {
StartUpdateTs string `json:"start_update_ts"`
IdInt int64 `json:"id_int"`
}
/**
@ -37,18 +72,12 @@ func DataExchange() {
//本轮上报的数量如果是0休息5秒后再继续上传
postCount := UploadData()
if postCount == 0 {
fmt.Println(CommonUtil.GetCurrentTime()+" 同步本轮没有可以上报的数据将休息5秒")
fmt.Println(CommonUtil.GetCurrentTime() + " 同步本轮没有可以上报的数据将休息5秒")
time.Sleep(5 * 1e9)
}
}
}
// 日志文件对应的结构体
type logStruct struct {
StartUpdateTs string `json:"start_update_ts"`
IdInt int64 `json:"id_int"`
}
/**
*
*
@ -66,7 +95,7 @@ func UploadData() int {
//查询结果集
var list []map[string]interface{}
//表名
tableName := sqlDict[i]
tableName := sqlDict[i].TableName
//日志文件位置
logName := progressFilePath + tableName + ".log"
//判断文件是不是存在
@ -77,7 +106,7 @@ func UploadData() int {
startUpdateTs = logstruct.StartUpdateTs
idInt = logstruct.IdInt
} else {
fmt.Println(CommonUtil.GetCurrentTime()+" 解析JSON文件失败:" + err.Error())
fmt.Println(CommonUtil.GetCurrentTime() + " 解析JSON文件失败:" + err.Error())
}
}
//一直循环
@ -87,9 +116,32 @@ func UploadData() int {
if len(list) == 0 {
break
}
//上报到Http Api
//TODO
//上报到Http Api--->Body--->Post
var ps postStruct
ps.DataSource = sqlDict[i].DataSource
ps.AuthToken = "DSDataex_Token_eb4ab2fea87161dc08fa794a648584c4"
ps.SystemId = "BASE_GO"
var dsMap = make([]dataStruct, 0)
for k := range list {
var ds dataStruct
ds.Data, _ = CommonUtil.MapToJson(list[k])
ds.DataId = list[k][sqlDict[i].PrimaryKey].(string)
ds.DelFlag = list[k]["del_flag"].(int64)
ds.OrgId = list[k]["bureau_id"].(string)
dsMap = append(dsMap, ds)
}
ps.Datas = dsMap
//将Struct转为json
jsonBytes, _ := json.Marshal(ps)
msg := string(jsonBytes)
//提交到汇集中心
p := httpDo("POST", ConfigUtil.DataExchangeUrl, msg)
if !p.Success {
fmt.Println(CommonUtil.GetCurrentTime() + " 同步上报到数据汇集中心失败将休息5秒后重试错误原因"+p.Message)
time.Sleep(5 * 1e9)
continue
}
//记录上报数量
count = count + len(list)
//记录日志
@ -102,7 +154,7 @@ func UploadData() int {
}
FileUtil.WriteContent(logName, string(jsonBytes))
//提示信息
fmt.Println(CommonUtil.GetCurrentTime()+" 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) +
fmt.Println(CommonUtil.GetCurrentTime() + " 同步:" + tableName + ",上报" + CommonUtil.ConvertIntToString(count) +
"个start_update_ts=" + startUpdateTs + ",id_int=" + CommonUtil.ConvertInt64ToString(idInt))
}
postCount = postCount + count
@ -178,3 +230,43 @@ func getRecordGt(gtSql string, lastUpdatedTime string, idInt int64, limit int) (
return lastUpdatedTime, idInt, nil
}
}
type FailResultStruct struct {
FailId string `json:"fail_id"`
FailReason string `json:"fail_reason"`
}
type ResultStruct struct {
Message string `json:"message"`
Success bool `json:"success"`
FailResults []FailResultStruct `json:"fail_results"`
}
// 基础方法这里多用于访问webapi配合上json转换。此方法可以运行但是不算完善。
func httpDo(method string, url string, msg string) ResultStruct {
var p ResultStruct
p.Success=false
p.Message="上报到汇集系统失败!"
client := &http.Client{}
body := bytes.NewBuffer([]byte(msg))
req, err := http.NewRequest(method,
url,
body)
if err != nil {
// handle error
}
req.Header.Set("Content-Type", "application/json;charset=utf-8")
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
return p
}
defer resp.Body.Close()
resultBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
return p
}
json.Unmarshal(resultBody, &p)
return p
}

@ -631,3 +631,26 @@ func CopyFields(sourceStruct interface{}, targetStruct interface{}, fields ...st
}
return
}
// Convert json string to map
func JsonToMap(jsonStr string) (map[string]string, error) {
m := make(map[string]string)
err := json.Unmarshal([]byte(jsonStr), &m)
if err != nil {
fmt.Printf("Unmarshal with error: %+v\n", err)
return nil, err
}
for k, v := range m {
fmt.Printf("%v: %v\n", k, v)
}
return m, nil
}
// Convert map json string
func MapToJson(m map[string]interface{}) (string, error) {
jsonByte, err := json.Marshal(m)
if err != nil {
fmt.Printf("Marshal with error: %+v\n", err)
return "", nil
}
return string(jsonByte), nil
}

@ -38,6 +38,9 @@ var (
ProjectName string
//kafka地址
KafkaAddress string
//数据汇集中心地址
DataExchangeUrl string
)
func init() {
@ -92,6 +95,9 @@ func init() {
ProjectName = iniParser.GetString("project", "project_name")
//kafka地址
KafkaAddress = iniParser.GetString("kafka", "KafkaAddress")
//数据汇集中心地址
DataExchangeUrl=iniParser.GetString("dataExchange","url")
}
type IniParser struct {

Loading…
Cancel
Save