package main import ( "context" "dsSupport/Utils/DbUtil" "dsSupport/models" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "github.com/tidwall/gjson" "log" "os" "time" ) // 深度分页 // https://www.cnblogs.com/hello-shf/p/11543453.html var client elastic.Client var db = DbUtil.Engine func main() { var host = "http://10.10.14.188:9200/" esClient, err := elastic.NewClient( elastic.SetURL(host), elastic.SetSniff(false), elastic.SetHealthcheckInterval(10*time.Second), elastic.SetGzip(true), elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), ) if err != nil { panic(err) } //要同步的索引名称,也就是表名称 indexName := "user_student" //取所有 CTX := context.Background() result, err := esClient.Scroll().Index(indexName).Size(100).Do(CTX) if err != nil { panic(err) } //第一次的结果集 for i := range result.Hits.Hits { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") //如果是第一次的话而且表不存在的情况下,重新创建表结构 m := make(map[string]interface{}) json.Unmarshal([]byte(value.String()), &m) if len(m) > 0 && i == len(result.Hits.Hits)-1 { addFieldData(indexName, m) break } } } /** 功能:创建一张INDEX对应的表 */ func addFieldData(tableName string, m map[string]interface{}) { //换算table_id sql := `select table_id from t_dw_table where table_name=?` list, _ := db.SQL(sql, tableName).Query().List() tableId := list[0]["table_id"].(int64) //先删除后插入 sql = `delete from t_dw_table_field where table_id=?` db.SQL(sql, tableId).Execute() //准备插入 for k, v := range m { model := new(models.TDwTableField) model.TableId = int32(tableId) model.FieldName = k switch v.(type) { case float64: model.DataType = "int" model.FieldLength = 4 model.DecimalPointLength = 0 case string: model.DataType = "varchar" model.FieldLength = 128 model.DecimalPointLength = 0 } _, err := db.Insert(model) if err != nil { panic(err) } } fmt.Println("完成现有Es Mapping的结构反向初始化工作,将手工修改表t_dw_table_field中数据!尤其是主键,一定要设置啊!") }