diff --git a/dsSupport/Test/Elasticsearch/ReadElasticsearch.go b/dsSupport/Test/Elasticsearch/ReadElasticsearch.go index 1c9f527f..52c23421 100644 --- a/dsSupport/Test/Elasticsearch/ReadElasticsearch.go +++ b/dsSupport/Test/Elasticsearch/ReadElasticsearch.go @@ -2,9 +2,11 @@ package main import ( "context" + "dsSupport/Utils/GpUtil" "encoding/json" "fmt" "github.com/olivere/elastic/v7" + "github.com/tidwall/gjson" "log" "os" "time" @@ -14,6 +16,7 @@ import ( // https://www.cnblogs.com/hello-shf/p/11543453.html var client elastic.Client +var db = GpUtil.Engine func main() { var host = "http://10.10.14.188:9200/" @@ -23,23 +26,34 @@ func main() { elastic.SetHealthcheckInterval(10*time.Second), elastic.SetGzip(true), elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), - elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags))) + //elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), + ) if err != nil { panic(err) } + //要同步的索引名称,也就是表名称 + indexName := "org_school" //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index("user_teacher").Size(10).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) if err != nil { - fmt.Println(err.Error()) + panic(err) } //第一次的结果集 for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i]) + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) - fmt.Println(resStr) + value := gjson.Get(resStr, "data_content") + //判断表是不是已存在 + + //如果是第一次的话而且表不存在的情况下,重新创建表结构 + m := make(map[string]interface{}) + json.Unmarshal([]byte(value.String()), &m) + if len(m) > 0 && i == 0 { + CreateTable(indexName, m) + } } //继续用的 scoll_id scrollId := result.ScrollId @@ -51,7 +65,6 @@ func main() { //开始循环 for { - fmt.Println(nowCount) //如果还有数据没有获取 if allCount > nowCount { result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) @@ -59,9 +72,13 @@ func main() { nowCount += int64(len(result.Hits.Hits)) for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i]) - resStr := string(resByte) - fmt.Println(resStr) + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + jsonStr := string(resByte) + m := make(map[string]interface{}) + json.Unmarshal([]byte(jsonStr), &m) + for k, v := range m { + fmt.Printf("%v: %v\n", k, v) + } } } else { //没有数据了 @@ -69,3 +86,25 @@ func main() { } } } + +/** +功能:创建一张INDEX对应的表 +*/ +func CreateTable(tableName string, m map[string]interface{}) { + createTableSql := `CREATE TABLE "public"."` + tableName + `" (` + "\r\n" + var c = "" + for k, v := range m { + switch v.(type) { + case float64: + c = "float8" + case string: + c = "varchar(128) " + ` COLLATE "pg_catalog"."default"` + } + createTableSql += `"` + k + `" ` + c + ",\r\n" + } + createTableSql = createTableSql[0 : len(createTableSql)-3] + createTableSql += "\r\n" + `);` + "\r\n" + //创建表 + db.SQL(createTableSql).Execute() + fmt.Println("成功创建表:"+tableName+"~") +} diff --git a/dsSupport/Utils/GpUtil/GpUtil.go b/dsSupport/Utils/GpUtil/GpUtil.go new file mode 100644 index 00000000..7b3e03af --- /dev/null +++ b/dsSupport/Utils/GpUtil/GpUtil.go @@ -0,0 +1,44 @@ +package GpUtil + +import ( + "dsSupport/Const/ErrorConst" + "dsSupport/Utils/LogUtil" + "fmt" + _ "github.com/lib/pq" + "github.com/xormplus/core" + "github.com/xormplus/xorm" + "github.com/xormplus/xorm/log" + "os" + "time" +) + +var Engine *xorm.Engine +var err error + +func init() { + //postgresql + psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", "10.10.14.107", 5432, "dsideal", "DsideaL147258369", "base_db_dev") + //格式 + Engine, err = xorm.NewEngine("postgres", psqlInfo) + if err != nil { + LogUtil.Error(ErrorConst.SqlQueryError, err.Error()) + } + //设置数据库连接池 + Engine.SetMaxIdleConns(200) //设置连接池中的保持连接的最大连接数。 + Engine.SetMaxOpenConns(200) //设置打开数据库的最大连接数,包含正在使用的连接和连接池的连接。 + Engine.SetConnMaxLifetime(time.Minute * 5) + //设置数据时区 + var location *time.Location + location, _ = time.LoadLocation("Asia/Shanghai") + Engine.TZLocation = location + //与 struct的映射方式,这里采用蛇型方法,默认是蛇形 + Engine.SetTableMapper(core.SnakeMapper{}) + + //显示+记录SQL日志 + f, _ := os.Create("./Logs/pg_sql.log") + + logger := log.NewSimpleLogger(f) + logger.SetLevel(log.LOG_DEBUG) + Engine.SetLogger(log.NewLoggerAdapter(logger)) + Engine.ShowSQL(true) // 则会在控制台打印出生成的SQL语句 +} diff --git a/dsSupport/go.mod b/dsSupport/go.mod index 77ac3328..c7955465 100644 --- a/dsSupport/go.mod +++ b/dsSupport/go.mod @@ -18,6 +18,7 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/golang/protobuf v1.4.1 github.com/golang/snappy v0.0.1 // indirect + github.com/lib/pq v1.8.0 github.com/oklog/ulid v1.3.1 github.com/olivere/elastic/v7 v7.0.20 github.com/pkg/sftp v1.12.0 @@ -26,6 +27,7 @@ require ( github.com/swaggo/gin-swagger v1.2.0 github.com/syndtr/goleveldb v1.0.0 // indirect github.com/tealeg/xlsx v1.0.5 // indirect + github.com/tidwall/gjson v1.6.1 // indirect github.com/xormplus/builder v0.0.0-20200331055651-240ff40009be github.com/xormplus/core v0.0.0-20200308074340-f3bce19d5f31 github.com/xormplus/xorm v0.0.0-20200731130200-6811f3bde592 diff --git a/dsSupport/go.sum b/dsSupport/go.sum index 24781677..465f75cc 100644 --- a/dsSupport/go.sum +++ b/dsSupport/go.sum @@ -105,6 +105,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg= +github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= @@ -155,6 +157,12 @@ github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFd github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tealeg/xlsx v1.0.5 h1:+f8oFmvY8Gw1iUXzPk+kz+4GpbDZPK1FhPiQRd+ypgE= github.com/tealeg/xlsx v1.0.5/go.mod h1:btRS8dz54TDnvKNosuAqxrM1QgN1udgk9O34bDCnORM= +github.com/tidwall/gjson v1.6.1 h1:LRbvNuNuvAiISWg6gxLEFuCe72UKy5hDqhxW/8183ws= +github.com/tidwall/gjson v1.6.1/go.mod h1:BaHyNc5bjzYkPqgLq7mdVzeiRtULKULXLgZFKsxEHI0= +github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= +github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= +github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU= +github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.5-pre/go.mod h1:FwP/aQVg39TXzItUBMwnWp9T9gPQnXw4Poh4/oBQZ/0= github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=