master
huanghai 5 years ago
parent 319590a92e
commit 166766c920

@ -45,6 +45,7 @@ func main() {
//要同步的索引名称,也就是表名称 //要同步的索引名称,也就是表名称
indexName := "org_school" indexName := "org_school"
keys := make([]string, 0)
//从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持 //从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持
sql := "select * from t_dw_table where table_name=?" sql := "select * from t_dw_table where table_name=?"
list, _ := mysqlDb.SQL(sql, indexName).Query().List() list, _ := mysqlDb.SQL(sql, indexName).Query().List()
@ -54,7 +55,7 @@ func main() {
pk := list[0]["field_name"].(string) pk := list[0]["field_name"].(string)
//取所有 //取所有
CTX := context.Background() CTX := context.Background()
result, err := esClient.Scroll().Index(indexName).Size(100).Do(CTX) result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -63,10 +64,10 @@ func main() {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte) resStr := string(resByte)
value := gjson.Get(resStr, "data_content") value := gjson.Get(resStr, "data_content")
addRecord(pk, value.String()) keys = addRecord(pk, value.String())
} }
//批量执行 //批量执行
batchSave(indexName, pk) batchSave(indexName, pk, keys)
//继续用的 scoll_id //继续用的 scoll_id
scrollId := result.ScrollId scrollId := result.ScrollId
@ -90,20 +91,21 @@ func main() {
resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resByte, _ := json.Marshal(result.Hits.Hits[i].Source)
resStr := string(resByte) resStr := string(resByte)
value := gjson.Get(resStr, "data_content") value := gjson.Get(resStr, "data_content")
addRecord(pk, value.String()) keys = addRecord(pk, value.String())
} }
} else { } else {
//没有数据了 //没有数据了
break break
} }
//批量执行 //批量执行
batchSave(indexName, pk) batchSave(indexName, pk, keys)
} }
} }
//后续优化为到达一定的阀值再提交一次初步定为200一次 //后续优化为到达一定的阀值再提交一次初步定为200一次
func addRecord(pk string, jsonStr string) { func addRecord(pk string, jsonStr string) []string {
var mymap map[string]interface{} var mymap map[string]interface{}
var keys []string
if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil {
//组装pkArray,用来update enable_flag //组装pkArray,用来update enable_flag
switch (mymap[pk]).(type) { switch (mymap[pk]).(type) {
@ -118,7 +120,6 @@ func addRecord(pk string, jsonStr string) {
} }
//另一个任务是组装insertStrArray,准备批量提交 //另一个任务是组装insertStrArray,准备批量提交
//拿到key //拿到key
var keys []string
for k := range mymap { for k := range mymap {
keys = append(keys, k) keys = append(keys, k)
} }
@ -141,50 +142,38 @@ func addRecord(pk string, jsonStr string) {
break break
} }
} }
lineSql = lineSql[0 : len(lineSql)-1] lineSql+="'"+CommonUtil.GetUUID()+"'"
insertStrArray = append(insertStrArray, lineSql) insertStrArray = append(insertStrArray, lineSql)
} else { } else {
fmt.Println(err.Error()) fmt.Println(err.Error())
} }
return keys
} }
//提交 //提交
func batchSave(tableName string, pkName string) { func batchSave(tableName string, pkName string, keys []string) {
//fmt.Println(pkStrArray)
//fmt.Println(insertStrArray)
var pkStr = "" var pkStr = ""
for i := range pkStrArray { for i := range pkStrArray {
pkStr += "'" + pkStrArray[i] + "'," pkStr += "'" + pkStrArray[i] + "',"
} }
pkStr = pkStr[0 : len(pkStr)-1] pkStr = pkStr[0 : len(pkStr)-1]
sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr+")" sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr + ")"
pgDb.SQL(sql).Execute() pgDb.SQL(sql).Execute()
/*
//插入
sql = `insert into ` + tableName + "("
var lineSql = ""
//根据key从m中拿元素就是按顺序拿了 //插入
for _, k := range keys { sql = `insert into ` + tableName + "("
sql += k + "," //根据key从m中拿元素就是按顺序拿了
switch mymap[k].(type) { for _, k := range keys {
case string: sql += k + ","
lineSql += "'" + mymap[k].(string) + "'," }
break sql+="uuid"
case int64:
lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + "," var lineSql = ""
break for i := range insertStrArray {
case float64: lineSql += "(" + insertStrArray[i] + "),"
lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + "," }
break lineSql = lineSql[0 : len(lineSql)-1]
default: sql += ") values " + lineSql + ";"
break pgDb.SQL(sql).Execute()
} fmt.Println("批量执行"+ CommonUtil.ConvertIntToString(len(insertStrArray))+"条.")
}
lineSql = lineSql[0 : len(lineSql)-1]
sql = sql[0 : len(sql)-1]
sql += ") values(" + lineSql + ");"
} else {
fmt.Println(err.Error())
*/
} }

Loading…
Cancel
Save