diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 3cc41826..c676b6dd 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -45,6 +45,7 @@ func main() { //要同步的索引名称,也就是表名称 indexName := "org_school" + keys := make([]string, 0) //从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持 sql := "select * from t_dw_table where table_name=?" list, _ := mysqlDb.SQL(sql, indexName).Query().List() @@ -54,7 +55,7 @@ func main() { pk := list[0]["field_name"].(string) //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(100).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX) if err != nil { panic(err) } @@ -63,10 +64,10 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(pk, value.String()) + keys = addRecord(pk, value.String()) } //批量执行 - batchSave(indexName, pk) + batchSave(indexName, pk, keys) //继续用的 scoll_id scrollId := result.ScrollId @@ -90,20 +91,21 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(pk, value.String()) + keys = addRecord(pk, value.String()) } } else { //没有数据了 break } //批量执行 - batchSave(indexName, pk) + batchSave(indexName, pk, keys) } } //后续优化为到达一定的阀值再提交一次,初步定为200一次 -func addRecord(pk string, jsonStr string) { +func addRecord(pk string, jsonStr string) []string { var mymap map[string]interface{} + var keys []string if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { //组装pkArray,用来update enable_flag switch (mymap[pk]).(type) { @@ -118,7 +120,6 @@ func addRecord(pk string, jsonStr string) { } //另一个任务是组装insertStrArray,准备批量提交 //拿到key - var keys []string for k := range mymap { keys = append(keys, k) } @@ -141,50 +142,38 @@ func addRecord(pk string, jsonStr string) { break } } - lineSql = lineSql[0 : len(lineSql)-1] + lineSql+="'"+CommonUtil.GetUUID()+"'" insertStrArray = append(insertStrArray, lineSql) } else { fmt.Println(err.Error()) } + return keys } //提交 -func batchSave(tableName string, pkName string) { - //fmt.Println(pkStrArray) - //fmt.Println(insertStrArray) +func batchSave(tableName string, pkName string, keys []string) { var pkStr = "" for i := range pkStrArray { pkStr += "'" + pkStrArray[i] + "'," } pkStr = pkStr[0 : len(pkStr)-1] - sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr+")" + sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr + ")" pgDb.SQL(sql).Execute() - /* - //插入 - sql = `insert into ` + tableName + "(" - var lineSql = "" - //根据key从m中拿元素,就是按顺序拿了 - for _, k := range keys { - sql += k + "," - switch mymap[k].(type) { - case string: - lineSql += "'" + mymap[k].(string) + "'," - break - case int64: - lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + "," - break - case float64: - lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + "," - break - default: - break - } - } - lineSql = lineSql[0 : len(lineSql)-1] - sql = sql[0 : len(sql)-1] - sql += ") values(" + lineSql + ");" - } else { - fmt.Println(err.Error()) - */ + //插入 + sql = `insert into ` + tableName + "(" + //根据key从m中拿元素,就是按顺序拿了 + for _, k := range keys { + sql += k + "," + } + sql+="uuid" + + var lineSql = "" + for i := range insertStrArray { + lineSql += "(" + insertStrArray[i] + ")," + } + lineSql = lineSql[0 : len(lineSql)-1] + sql += ") values " + lineSql + ";" + pgDb.SQL(sql).Execute() + fmt.Println("批量执行"+ CommonUtil.ConvertIntToString(len(insertStrArray))+"条.") }