package main import ( "context" "dsSupport/Utils/CommonUtil" "dsSupport/Utils/PgUtil" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "github.com/tidwall/gjson" "log" "os" "time" ) // 深度分页 // TODO // 后续需要记录每个INDEX的最后同步时间,形成日志文件,现在是一次性全量同步 // 因目前是以业务主键ID保持,所以比ES的数据量肯定是要少的,因为更新的日志记录都没有同步到数仓,只是最终有效的业务ID同步过来。 // https://www.cnblogs.com/hello-shf/p/11543453.html var client elastic.Client var db = PgUtil.Engine func main() { var host = "http://10.10.14.188:9200/" esClient, err := elastic.NewClient( elastic.SetURL(host), elastic.SetSniff(false), elastic.SetHealthcheckInterval(10*time.Second), elastic.SetGzip(true), elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), ) if err != nil { panic(err) } //要同步的索引名称,也就是表名称 indexName := "org_school" sql := `SELECT pg_constraint.conname AS pk_name, pg_attribute.attname AS colname, pg_type.typname AS typename FROM pg_constraint INNER JOIN pg_class ON pg_constraint.conrelid = pg_class.oid INNER JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid AND pg_attribute.attnum = pg_constraint.conkey [ 1 ] INNER JOIN pg_type ON pg_type.oid = pg_attribute.atttypid WHERE pg_class.relname = ? AND pg_constraint.contype = 'p'` list, _ := db.SQL(sql, indexName).Query().List() pk := list[0]["colname"].(string) //取所有 CTX := context.Background() result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) if err != nil { panic(err) } //第一次的结果集 for i := range result.Hits.Hits { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") addRecord(indexName, pk, value.String()) } //继续用的 scoll_id scrollId := result.ScrollId //第一次命中的个数 var nowCount = int64(len(result.Hits.Hits)) //总的命中个数 allCount := result.TotalHits() //开始循环 for { //如果还有数据没有获取 if allCount > nowCount { result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) scrollId = result.ScrollId nowCount += int64(len(result.Hits.Hits)) for i := range result.Hits.Hits { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") addRecord(indexName, pk, value.String()) } } else { //没有数据了 break } } } //后续优化为到达一定的阀值再提交一次,初步定为200一次 func addRecord(tableName string, pk string, jsonStr string) { var mymap map[string]interface{} var sql = "" if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { //先删除再插入 sql = `delete from ` + tableName + ` where ` + pk + `=?` switch (mymap[pk]).(type) { case string: _, err := db.SQL(sql, mymap[pk].(string)).Execute() if err != nil { panic(err.Error()) } break case float64: _, err := db.SQL(sql, int64(mymap[pk].(float64))).Execute() if err != nil { panic(err.Error()) } break default: break } //插入 sql = `insert into ` + tableName + "(" var lineSql = "" for key, value := range mymap { sql += key + "," switch value.(type) { case string: lineSql += "'" + value.(string) + "'," break case int64: lineSql += CommonUtil.ConvertInt64ToString(value.(int64)) + "," break case float64: lineSql += fmt.Sprintf("%d", int64(value.(float64))) + "," break default: break } } lineSql = lineSql[0 : len(lineSql)-1] sql = sql[0 : len(sql)-1] sql += ") values(" + lineSql + ");" } else { fmt.Println(err.Error()) } fmt.Println(sql) _, err := db.SQL(sql).Execute() if err != nil { panic(err.Error()) } else { fmt.Println("成功插入一条数据!") } }