From f9af69fe22c6da532a0a1955759dfc4e193358d7 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 12:02:18 +0800 Subject: [PATCH 01/20] 'commit' --- .../Test/Elasticsearch/ReadElasticsearch.go | 57 ++++++++++++++++--- dsSupport/Utils/GpUtil/GpUtil.go | 44 ++++++++++++++ dsSupport/go.mod | 2 + dsSupport/go.sum | 8 +++ 4 files changed, 102 insertions(+), 9 deletions(-) create mode 100644 dsSupport/Utils/GpUtil/GpUtil.go diff --git a/dsSupport/Test/Elasticsearch/ReadElasticsearch.go b/dsSupport/Test/Elasticsearch/ReadElasticsearch.go index 1c9f527f..52c23421 100644 --- a/dsSupport/Test/Elasticsearch/ReadElasticsearch.go +++ b/dsSupport/Test/Elasticsearch/ReadElasticsearch.go @@ -2,9 +2,11 @@ package main import ( "context" + "dsSupport/Utils/GpUtil" "encoding/json" "fmt" "github.com/olivere/elastic/v7" + "github.com/tidwall/gjson" "log" "os" "time" @@ -14,6 +16,7 @@ import ( // https://www.cnblogs.com/hello-shf/p/11543453.html var client elastic.Client +var db = GpUtil.Engine func main() { var host = "http://10.10.14.188:9200/" @@ -23,23 +26,34 @@ func main() { elastic.SetHealthcheckInterval(10*time.Second), elastic.SetGzip(true), elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), - elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags))) + //elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), + ) if err != nil { panic(err) } + //要同步的索引名称,也就是表名称 + indexName := "org_school" //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index("user_teacher").Size(10).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) if err != nil { - fmt.Println(err.Error()) + panic(err) } //第一次的结果集 for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i]) + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) - fmt.Println(resStr) + value := gjson.Get(resStr, "data_content") + //判断表是不是已存在 + + //如果是第一次的话而且表不存在的情况下,重新创建表结构 + m := make(map[string]interface{}) + json.Unmarshal([]byte(value.String()), &m) + if len(m) > 0 && i == 0 { + CreateTable(indexName, m) + } } //继续用的 scoll_id scrollId := result.ScrollId @@ -51,7 +65,6 @@ func main() { //开始循环 for { - fmt.Println(nowCount) //如果还有数据没有获取 if allCount > nowCount { result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) @@ -59,9 +72,13 @@ func main() { nowCount += int64(len(result.Hits.Hits)) for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i]) - resStr := string(resByte) - fmt.Println(resStr) + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + jsonStr := string(resByte) + m := make(map[string]interface{}) + json.Unmarshal([]byte(jsonStr), &m) + for k, v := range m { + fmt.Printf("%v: %v\n", k, v) + } } } else { //没有数据了 @@ -69,3 +86,25 @@ func main() { } } } + +/** +功能:创建一张INDEX对应的表 +*/ +func CreateTable(tableName string, m map[string]interface{}) { + createTableSql := `CREATE TABLE "public"."` + tableName + `" (` + "\r\n" + var c = "" + for k, v := range m { + switch v.(type) { + case float64: + c = "float8" + case string: + c = "varchar(128) " + ` COLLATE "pg_catalog"."default"` + } + createTableSql += `"` + k + `" ` + c + ",\r\n" + } + createTableSql = createTableSql[0 : len(createTableSql)-3] + createTableSql += "\r\n" + `);` + "\r\n" + //创建表 + db.SQL(createTableSql).Execute() + fmt.Println("成功创建表:"+tableName+"~") +} diff --git a/dsSupport/Utils/GpUtil/GpUtil.go b/dsSupport/Utils/GpUtil/GpUtil.go new file mode 100644 index 00000000..7b3e03af --- /dev/null +++ b/dsSupport/Utils/GpUtil/GpUtil.go @@ -0,0 +1,44 @@ +package GpUtil + +import ( + "dsSupport/Const/ErrorConst" + "dsSupport/Utils/LogUtil" + "fmt" + _ "github.com/lib/pq" + "github.com/xormplus/core" + "github.com/xormplus/xorm" + "github.com/xormplus/xorm/log" + "os" + "time" +) + +var Engine *xorm.Engine +var err error + +func init() { + //postgresql + psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", "10.10.14.107", 5432, "dsideal", "DsideaL147258369", "base_db_dev") + //格式 + Engine, err = xorm.NewEngine("postgres", psqlInfo) + if err != nil { + LogUtil.Error(ErrorConst.SqlQueryError, err.Error()) + } + //设置数据库连接池 + Engine.SetMaxIdleConns(200) //设置连接池中的保持连接的最大连接数。 + Engine.SetMaxOpenConns(200) //设置打开数据库的最大连接数,包含正在使用的连接和连接池的连接。 + Engine.SetConnMaxLifetime(time.Minute * 5) + //设置数据时区 + var location *time.Location + location, _ = time.LoadLocation("Asia/Shanghai") + Engine.TZLocation = location + //与 struct的映射方式,这里采用蛇型方法,默认是蛇形 + Engine.SetTableMapper(core.SnakeMapper{}) + + //显示+记录SQL日志 + f, _ := os.Create("./Logs/pg_sql.log") + + logger := log.NewSimpleLogger(f) + logger.SetLevel(log.LOG_DEBUG) + Engine.SetLogger(log.NewLoggerAdapter(logger)) + Engine.ShowSQL(true) // 则会在控制台打印出生成的SQL语句 +} diff --git a/dsSupport/go.mod b/dsSupport/go.mod index 77ac3328..c7955465 100644 --- a/dsSupport/go.mod +++ b/dsSupport/go.mod @@ -18,6 +18,7 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/golang/protobuf v1.4.1 github.com/golang/snappy v0.0.1 // indirect + github.com/lib/pq v1.8.0 github.com/oklog/ulid v1.3.1 github.com/olivere/elastic/v7 v7.0.20 github.com/pkg/sftp v1.12.0 @@ -26,6 +27,7 @@ require ( github.com/swaggo/gin-swagger v1.2.0 github.com/syndtr/goleveldb v1.0.0 // indirect github.com/tealeg/xlsx v1.0.5 // indirect + github.com/tidwall/gjson v1.6.1 // indirect github.com/xormplus/builder v0.0.0-20200331055651-240ff40009be github.com/xormplus/core v0.0.0-20200308074340-f3bce19d5f31 github.com/xormplus/xorm v0.0.0-20200731130200-6811f3bde592 diff --git a/dsSupport/go.sum b/dsSupport/go.sum index 24781677..465f75cc 100644 --- a/dsSupport/go.sum +++ b/dsSupport/go.sum @@ -105,6 +105,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg= +github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= @@ -155,6 +157,12 @@ github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFd github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tealeg/xlsx v1.0.5 h1:+f8oFmvY8Gw1iUXzPk+kz+4GpbDZPK1FhPiQRd+ypgE= github.com/tealeg/xlsx v1.0.5/go.mod h1:btRS8dz54TDnvKNosuAqxrM1QgN1udgk9O34bDCnORM= +github.com/tidwall/gjson v1.6.1 h1:LRbvNuNuvAiISWg6gxLEFuCe72UKy5hDqhxW/8183ws= +github.com/tidwall/gjson v1.6.1/go.mod h1:BaHyNc5bjzYkPqgLq7mdVzeiRtULKULXLgZFKsxEHI0= +github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= +github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= +github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU= +github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.5-pre/go.mod h1:FwP/aQVg39TXzItUBMwnWp9T9gPQnXw4Poh4/oBQZ/0= github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= From 67955679124ca09335d13130b99b14c64aa76b91 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 13:03:40 +0800 Subject: [PATCH 02/20] 'commit' --- ...ch.go => ReadElasticsearchDynamicTable.go} | 0 .../ReadElasticsearchToCsv.go | 99 +++++++++++++++++++ 2 files changed, 99 insertions(+) rename dsSupport/Test/Elasticsearch/{ReadElasticsearch.go => ReadElasticsearchDynamicTable.go} (100%) create mode 100644 dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go diff --git a/dsSupport/Test/Elasticsearch/ReadElasticsearch.go b/dsSupport/Test/Elasticsearch/ReadElasticsearchDynamicTable.go similarity index 100% rename from dsSupport/Test/Elasticsearch/ReadElasticsearch.go rename to dsSupport/Test/Elasticsearch/ReadElasticsearchDynamicTable.go diff --git a/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go b/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go new file mode 100644 index 00000000..0c872fea --- /dev/null +++ b/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go @@ -0,0 +1,99 @@ +package main + +import ( + "context" + "dsSupport/Utils/GpUtil" + "encoding/csv" + "encoding/json" + "fmt" + "github.com/olivere/elastic/v7" + "github.com/tidwall/gjson" + "log" + "os" + "time" +) + +// 深度分页 +// https://www.cnblogs.com/hello-shf/p/11543453.html + +var client elastic.Client +var db = GpUtil.Engine + +func main() { + var host = "http://10.10.14.188:9200/" + esClient, err := elastic.NewClient( + elastic.SetURL(host), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(10*time.Second), + elastic.SetGzip(true), + elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), + elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), + ) + + if err != nil { + panic(err) + } + + //要同步的索引名称,也就是表名称 + indexName := "org_school" + //取所有 + CTX := context.Background() + result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) + if err != nil { + panic(err) + } + //第一次的结果集 + for i := range result.Hits.Hits { + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + resStr := string(resByte) + value := gjson.Get(resStr, "data_content") + fmt.Println(value.String()) + } + //继续用的 scoll_id + scrollId := result.ScrollId + + //第一次命中的个数 + var nowCount = int64(len(result.Hits.Hits)) + //总的命中个数 + allCount := result.TotalHits() + + //开始循环 + for { + //如果还有数据没有获取 + if allCount > nowCount { + result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) + scrollId = result.ScrollId + nowCount += int64(len(result.Hits.Hits)) + + for i := range result.Hits.Hits { + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + jsonStr := string(resByte) + m := make(map[string]interface{}) + json.Unmarshal([]byte(jsonStr), &m) + for k, v := range m { + fmt.Printf("%v: %v\n", k, v) + } + } + } else { + //没有数据了 + break + } + } +} + +func SaveToCsv(csvFileName string, _map []map[string]interface{}) { + file, err := os.OpenFile(csvFileName, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + fmt.Println("open file is failed, err: ", err) + } + defer file.Close() + // 写入UTF-8 BOM,防止中文乱码 + file.WriteString("\xEF\xBB\xBF") + w := csv.NewWriter(file) + for i := range _map { + _map[i] + } + w.Write([]string{"开发者名称", "开发者邮箱", "应用名称"}) + // 写文件需要flush,不然缓存满了,后面的就写不进去了,只会写一部分 + w.Flush() +} From b399be8c032a59b0dae160e727acf1e6c3b269ba Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 13:36:08 +0800 Subject: [PATCH 03/20] 'commit' --- dsBaseRpc/models/t_app_base.go | 1 + dsBaseRpc/models/t_base_organization_log.go | 2 +- dsBaseRpc/models/t_base_teacher_position.go | 2 +- dsBaseRpc/models/t_dataex_dataaccess.go | 2 +- dsBaseRpc/models/t_dataex_error.go | 5 +- dsBaseRpc/models/t_dataex_jyt2012.go | 1 + dsBaseRpc/models/t_dw_table.go | 6 + dsBaseRpc/models/t_dw_table_field.go | 13 +++ dsSupport/Test/CreatePgTable/CreatePgTable.go | 29 ++--- .../ReadElasticsearchDynamicTable.go | 110 ------------------ .../ReadElasticsearchToCsv.go | 10 +- .../ReadEsExistMapping/ReadEsExistMapping.go | 89 ++++++++++++++ .../{GpUtil/GpUtil.go => PgUtil/PgUtil.go} | 2 +- dsSupport/go.mod | 2 +- dsSupport/models/t_dw_table.go | 6 + dsSupport/models/t_dw_table_field.go | 13 +++ 16 files changed, 158 insertions(+), 135 deletions(-) create mode 100644 dsBaseRpc/models/t_dw_table.go create mode 100644 dsBaseRpc/models/t_dw_table_field.go delete mode 100644 dsSupport/Test/Elasticsearch/ReadElasticsearchDynamicTable.go create mode 100644 dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go rename dsSupport/Utils/{GpUtil/GpUtil.go => PgUtil/PgUtil.go} (98%) create mode 100644 dsSupport/models/t_dw_table.go create mode 100644 dsSupport/models/t_dw_table_field.go diff --git a/dsBaseRpc/models/t_app_base.go b/dsBaseRpc/models/t_app_base.go index 4a63ff48..4e608af8 100644 --- a/dsBaseRpc/models/t_app_base.go +++ b/dsBaseRpc/models/t_app_base.go @@ -6,6 +6,7 @@ type TAppBase struct { AppName string `xorm:"not null comment('系统名称') VARCHAR(100)"` AccessKey string `xorm:"not null comment('AK') CHAR(20)"` SecretKey string `xorm:"not null comment('SK') CHAR(26)"` + AppToken string `xorm:"default 'NULL' comment('系统票据') CHAR(100)"` AppUrl string `xorm:"default 'NULL' comment('系统集成页面调用地址') VARCHAR(1024)"` AppIcon string `xorm:"default 'NULL' comment('系统图标的路径') VARCHAR(1024)"` RedirectUri string `xorm:"default 'NULL' comment('统一认证回调地址') VARCHAR(1024)"` diff --git a/dsBaseRpc/models/t_base_organization_log.go b/dsBaseRpc/models/t_base_organization_log.go index a48e3df0..148ce0a8 100644 --- a/dsBaseRpc/models/t_base_organization_log.go +++ b/dsBaseRpc/models/t_base_organization_log.go @@ -12,7 +12,7 @@ type TBaseOrganizationLog struct { MainSchoolType int32 `xorm:"not null default 1 comment('主校类型,1:普通校,2:主校,3:分校') INT(11)"` MainSchoolId string `xorm:"not null default '''' comment('如果是分校,所属主校的ID,如果是主校,是自己的ID,如果是分校,是主校ID,如果是普通校,也是自己的ID') CHAR(36)"` Xxbxlxm string `xorm:"not null default '''' comment('学校办学类型(学校专有属性)有字典') CHAR(3)"` - Szdcxlxm string `xorm:"not null default '''' comment('学校城乡类型(学校专有属性)有字典') CHAR(2)"` + Szdcxlxm string `xorm:"not null default '''' comment('学校城乡类型(学校专有属性)有字典') CHAR(3)"` Xxjbzm string `xorm:"not null default '''' comment('学校举办者(学校专有属性)有字典') CHAR(3)"` Fzr string `xorm:"not null default '''' comment('机构负责人ID(*一个单位只能有一个负责人)') CHAR(36)"` Fddbr string `xorm:"not null default '''' comment('机构法定代表人') VARCHAR(50)"` diff --git a/dsBaseRpc/models/t_base_teacher_position.go b/dsBaseRpc/models/t_base_teacher_position.go index 08d2db18..63f50e08 100644 --- a/dsBaseRpc/models/t_base_teacher_position.go +++ b/dsBaseRpc/models/t_base_teacher_position.go @@ -2,7 +2,7 @@ package models type TBaseTeacherPosition struct { Id string `xorm:"not null pk comment('主键') CHAR(36)"` - PersonId string `xorm:"not null default '''' comment('人员ID') index(person_id) CHAR(18)"` + PersonId string `xorm:"not null default '''' comment('人员ID') index(person_id) CHAR(36)"` BureauId string `xorm:"not null comment('所在单位ID') index(person_id) CHAR(36)"` PositionId string `xorm:"not null default ''0'' comment('职务ID') CHAR(36)"` BUse int32 `xorm:"not null default 1 comment('是否可用 1:可用 -2:不可用') index(person_id) TINYINT(1)"` diff --git a/dsBaseRpc/models/t_dataex_dataaccess.go b/dsBaseRpc/models/t_dataex_dataaccess.go index 1346808b..ae2bb76b 100644 --- a/dsBaseRpc/models/t_dataex_dataaccess.go +++ b/dsBaseRpc/models/t_dataex_dataaccess.go @@ -8,7 +8,7 @@ type TDataexDataaccess struct { Id string `xorm:"not null pk comment('ID') VARCHAR(36)"` DatasourceId string `xorm:"not null comment('数据源ID') index VARCHAR(36)"` DatasourceCode string `xorm:"default 'NULL' VARCHAR(255)"` - ConsumeSystemid string `xorm:"not null comment('数据使用系统ID') index VARCHAR(36)"` + ConsumeSystemid string `xorm:"default 'NULL' comment('数据使用系统ID') index CHAR(36)"` QueryFlag int32 `xorm:"not null default 1 comment('可查【1:是,-1:否】') INT(11)"` SetFlag int32 `xorm:"not null default -1 comment('可修改【1:是,-1:否】') INT(11)"` ConsumeType int32 `xorm:"not null comment('使用数据范围【1:本机构,2:本机构以及下属机构,-1:不限制】') INT(11)"` diff --git a/dsBaseRpc/models/t_dataex_error.go b/dsBaseRpc/models/t_dataex_error.go index e1daba47..2ecdf23a 100644 --- a/dsBaseRpc/models/t_dataex_error.go +++ b/dsBaseRpc/models/t_dataex_error.go @@ -8,10 +8,11 @@ type TDataexError struct { Id string `xorm:"not null pk comment('ID') VARCHAR(36)"` SystemId string `xorm:"not null comment('数据提供系统ID【输入参数】') VARCHAR(36)"` DatasourceId string `xorm:"not null comment('数据源ID【输入参数】') VARCHAR(36)"` - OrgId string `xorm:"not null comment('数据机构ID【输入参数】') VARCHAR(36)"` - DataId string `xorm:"not null comment('数据ID【输入参数】') VARCHAR(36)"` + OrgId string `xorm:"default 'NULL' comment('数据机构ID【输入参数】') VARCHAR(36)"` + DataId string `xorm:"default 'NULL' comment('数据ID【输入参数】') VARCHAR(36)"` DataContent string `xorm:"default 'NULL' comment('数据内容【Json格式输入参数】') LONGTEXT"` FileUri string `xorm:"default 'NULL' comment('文件地址【文件交换】') VARCHAR(500)"` + ErrorInfo string `xorm:"default 'NULL' comment('错误信息说明') VARCHAR(5000)"` ChangeTime time.Time `xorm:"default 'NULL' comment('最近修改时间') DATETIME"` DeleteTime time.Time `xorm:"default 'NULL' comment('删除时间') DATETIME"` EnableFlag int32 `xorm:"not null default 1 comment('启用标志【默认1,1:启用,-1:禁用】') INT(11)"` diff --git a/dsBaseRpc/models/t_dataex_jyt2012.go b/dsBaseRpc/models/t_dataex_jyt2012.go index 47c395d0..515e6e94 100644 --- a/dsBaseRpc/models/t_dataex_jyt2012.go +++ b/dsBaseRpc/models/t_dataex_jyt2012.go @@ -7,6 +7,7 @@ import ( type TDataexJyt2012 struct { Id string `xorm:"not null pk comment('ID') VARCHAR(36)"` DicName string `xorm:"not null comment('字典/字典项名称') VARCHAR(100)"` + DicValue string `xorm:"not null VARCHAR(100)"` DicType int32 `xorm:"not null default 1 comment('字典类型【1:国标数据、关联数据源,2:国标字典、关联元数据】') INT(11)"` DicInfo string `xorm:"default ''NULL'' comment('字典相信说明') VARCHAR(500)"` RootFlag int32 `xorm:"not null default -1 comment('是否是字典【1:是,-1:否】') INT(11)"` diff --git a/dsBaseRpc/models/t_dw_table.go b/dsBaseRpc/models/t_dw_table.go new file mode 100644 index 00000000..bc01a486 --- /dev/null +++ b/dsBaseRpc/models/t_dw_table.go @@ -0,0 +1,6 @@ +package models + +type TDwTable struct { + TableId int32 `xorm:"not null pk autoincr INT(255)"` + TableName string `xorm:"not null VARCHAR(255)"` +} diff --git a/dsBaseRpc/models/t_dw_table_field.go b/dsBaseRpc/models/t_dw_table_field.go new file mode 100644 index 00000000..7d73919c --- /dev/null +++ b/dsBaseRpc/models/t_dw_table_field.go @@ -0,0 +1,13 @@ +package models + +type TDwTableField struct { + Id int32 `xorm:"not null pk autoincr INT(11)"` + TableId int32 `xorm:"not null comment('哪张表') index INT(11)"` + FieldName string `xorm:"not null comment('字段名称') VARCHAR(255)"` + DataType string `xorm:"not null comment('数据类型') VARCHAR(255)"` + FieldLength int32 `xorm:"not null comment('字段长度') INT(11)"` + DecimalPointLength int32 `xorm:"not null default 0 comment('小数点后长度') INT(11)"` + Comment string `xorm:"default 'NULL' comment('描述') VARCHAR(255)"` + IsPk int32 `xorm:"not null default 0 comment('是否为主键') INT(255)"` + IsNull int32 `xorm:"not null comment('是否可为空') INT(255)"` +} diff --git a/dsSupport/Test/CreatePgTable/CreatePgTable.go b/dsSupport/Test/CreatePgTable/CreatePgTable.go index 5584c509..d11a8eec 100644 --- a/dsSupport/Test/CreatePgTable/CreatePgTable.go +++ b/dsSupport/Test/CreatePgTable/CreatePgTable.go @@ -3,10 +3,12 @@ package main import ( "dsSupport/Utils/CommonUtil" "dsSupport/Utils/DbUtil" + "dsSupport/Utils/PgUtil" "fmt" ) var db = DbUtil.Engine +var pgDb = PgUtil.Engine func main() { //1、读取每一张需要创建的表 @@ -17,7 +19,9 @@ func main() { tableId := list[i]["table_id"].(int64) tableName := list[i]["table_name"].(string) createTableSql := `DROP TABLE IF EXISTS "public"."` + tableName + `";` + "\r\n" - createTableSql += `CREATE TABLE "public"."` + tableName + `" (` + "\r\n" + pgDb.SQL(createTableSql).Execute() + + createTableSql = `CREATE TABLE "public"."` + tableName + `" (` + "\r\n" //2、根据表名,获取相应的创建表的信息 sql = `select * from t_dw_table_field where table_id=?` list2, _ := db.SQL(sql, tableId).Query().List() @@ -35,20 +39,16 @@ func main() { if isNull == 0 { c += " NOT NULL," } else { - if i2 < len(list2)-1 { c += "," - } } createTableSql += `"` + fieldName + `" ` + c + "\r\n" } + createTableSql=createTableSql[0:len(createTableSql)-3] createTableSql += `);` + "\r\n" - //添加生成字段描述信息 - for i2 := range list2 { - fieldName := list2[i2]["field_name"].(string) - comment := list2[i2]["comment"] - if comment != nil && len(comment.(string)) > 0 { - createTableSql += `COMMENT ON COLUMN "public"."` + tableName + `"."` + fieldName + `" IS '` + comment.(string) + `';` + "\r\n" - } + _, err := pgDb.SQL(createTableSql).Execute() + if err != nil { + fmt.Println(createTableSql) + panic(err.Error()) } //主键有哪些 var pks = "" @@ -62,9 +62,12 @@ func main() { //去掉最后一个逗号 pks = pks[0 : len(pks)-1] //添加主键描述 - createTableSql += `-- 设置主键` + "\r\n" - createTableSql += `ALTER TABLE "public"."` + tableName + `" ADD CONSTRAINT "` + tableName + `_dw_pkey" PRIMARY KEY ("` + pks + `");` + "\r\n" + createTableSql = `ALTER TABLE "public"."` + tableName + `" ADD CONSTRAINT "` + tableName + `_dw_pkey" PRIMARY KEY ("` + pks + `");` + "\r\n" + _, err = pgDb.SQL(createTableSql).Execute() + if err != nil { + panic(err.Error()) + } - fmt.Println(createTableSql) + fmt.Println("恭喜,数据仓库中相应表格创建完毕!") } } diff --git a/dsSupport/Test/Elasticsearch/ReadElasticsearchDynamicTable.go b/dsSupport/Test/Elasticsearch/ReadElasticsearchDynamicTable.go deleted file mode 100644 index 52c23421..00000000 --- a/dsSupport/Test/Elasticsearch/ReadElasticsearchDynamicTable.go +++ /dev/null @@ -1,110 +0,0 @@ -package main - -import ( - "context" - "dsSupport/Utils/GpUtil" - "encoding/json" - "fmt" - "github.com/olivere/elastic/v7" - "github.com/tidwall/gjson" - "log" - "os" - "time" -) - -// 深度分页 -// https://www.cnblogs.com/hello-shf/p/11543453.html - -var client elastic.Client -var db = GpUtil.Engine - -func main() { - var host = "http://10.10.14.188:9200/" - esClient, err := elastic.NewClient( - elastic.SetURL(host), - elastic.SetSniff(false), - elastic.SetHealthcheckInterval(10*time.Second), - elastic.SetGzip(true), - elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), - //elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), - ) - - if err != nil { - panic(err) - } - - //要同步的索引名称,也就是表名称 - indexName := "org_school" - //取所有 - CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) - if err != nil { - panic(err) - } - //第一次的结果集 - for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i].Source) - resStr := string(resByte) - value := gjson.Get(resStr, "data_content") - //判断表是不是已存在 - - //如果是第一次的话而且表不存在的情况下,重新创建表结构 - m := make(map[string]interface{}) - json.Unmarshal([]byte(value.String()), &m) - if len(m) > 0 && i == 0 { - CreateTable(indexName, m) - } - } - //继续用的 scoll_id - scrollId := result.ScrollId - - //第一次命中的个数 - var nowCount = int64(len(result.Hits.Hits)) - //总的命中个数 - allCount := result.TotalHits() - - //开始循环 - for { - //如果还有数据没有获取 - if allCount > nowCount { - result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) - scrollId = result.ScrollId - nowCount += int64(len(result.Hits.Hits)) - - for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i].Source) - jsonStr := string(resByte) - m := make(map[string]interface{}) - json.Unmarshal([]byte(jsonStr), &m) - for k, v := range m { - fmt.Printf("%v: %v\n", k, v) - } - } - } else { - //没有数据了 - break - } - } -} - -/** -功能:创建一张INDEX对应的表 -*/ -func CreateTable(tableName string, m map[string]interface{}) { - createTableSql := `CREATE TABLE "public"."` + tableName + `" (` + "\r\n" - var c = "" - for k, v := range m { - switch v.(type) { - case float64: - c = "float8" - case string: - c = "varchar(128) " + ` COLLATE "pg_catalog"."default"` - } - createTableSql += `"` + k + `" ` + c + ",\r\n" - } - createTableSql = createTableSql[0 : len(createTableSql)-3] - createTableSql += "\r\n" + `);` + "\r\n" - //创建表 - db.SQL(createTableSql).Execute() - fmt.Println("成功创建表:"+tableName+"~") -} diff --git a/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go b/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go index 0c872fea..1e49f83b 100644 --- a/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go +++ b/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go @@ -2,7 +2,7 @@ package main import ( "context" - "dsSupport/Utils/GpUtil" + "dsSupport/Utils/PgUtil" "encoding/csv" "encoding/json" "fmt" @@ -17,7 +17,7 @@ import ( // https://www.cnblogs.com/hello-shf/p/11543453.html var client elastic.Client -var db = GpUtil.Engine +var db = PgUtil.Engine func main() { var host = "http://10.10.14.188:9200/" @@ -90,9 +90,9 @@ func SaveToCsv(csvFileName string, _map []map[string]interface{}) { // 写入UTF-8 BOM,防止中文乱码 file.WriteString("\xEF\xBB\xBF") w := csv.NewWriter(file) - for i := range _map { - _map[i] - } + //for i := range _map { + // //_map[i] + //} w.Write([]string{"开发者名称", "开发者邮箱", "应用名称"}) // 写文件需要flush,不然缓存满了,后面的就写不进去了,只会写一部分 w.Flush() diff --git a/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go b/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go new file mode 100644 index 00000000..7d6d9054 --- /dev/null +++ b/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "dsSupport/Utils/DbUtil" + "dsSupport/models" + "encoding/json" + "fmt" + "github.com/olivere/elastic/v7" + "github.com/tidwall/gjson" + "log" + "os" + "time" +) + +// 深度分页 +// https://www.cnblogs.com/hello-shf/p/11543453.html + +var client elastic.Client +var db = DbUtil.Engine + +func main() { + var host = "http://10.10.14.188:9200/" + esClient, err := elastic.NewClient( + elastic.SetURL(host), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(10*time.Second), + elastic.SetGzip(true), + elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), + elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), + ) + + if err != nil { + panic(err) + } + + //要同步的索引名称,也就是表名称 + indexName := "org_school" + //取所有 + CTX := context.Background() + result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) + if err != nil { + panic(err) + } + //第一次的结果集 + for i := range result.Hits.Hits { + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + resStr := string(resByte) + value := gjson.Get(resStr, "data_content") + //如果是第一次的话而且表不存在的情况下,重新创建表结构 + m := make(map[string]interface{}) + json.Unmarshal([]byte(value.String()), &m) + if len(m) > 0 && i == 0 { + addFieldData(indexName, m) + break + } + } +} + +/** +功能:创建一张INDEX对应的表 +*/ +func addFieldData(tableName string, m map[string]interface{}) { + //换算table_id + sql := `select table_id from t_dw_table where table_name=?` + list, _ := db.SQL(sql, tableName).Query().List() + tableId := list[0]["table_id"].(int64) + //先删除后插入 + sql = `delete from t_dw_table_field where table_id=?` + db.SQL(sql, tableId).Execute() + //准备插入 + for k, v := range m { + model := new(models.TDwTableField) + model.TableId = int32(tableId) + model.FieldName = k + switch v.(type) { + case float64: + model.DataType = "int" + model.FieldLength = 4 + model.DecimalPointLength = 0 + case string: + model.DataType = "varchar" + model.FieldLength = 128 + model.DecimalPointLength = 0 + } + db.Insert(model) + } + fmt.Println("完成现有Es Mapping的结构反向初始化工作,将手工修改表t_dw_table_field中数据!尤其是主键,一定要设置啊!") +} diff --git a/dsSupport/Utils/GpUtil/GpUtil.go b/dsSupport/Utils/PgUtil/PgUtil.go similarity index 98% rename from dsSupport/Utils/GpUtil/GpUtil.go rename to dsSupport/Utils/PgUtil/PgUtil.go index 7b3e03af..eaac874c 100644 --- a/dsSupport/Utils/GpUtil/GpUtil.go +++ b/dsSupport/Utils/PgUtil/PgUtil.go @@ -1,4 +1,4 @@ -package GpUtil +package PgUtil import ( "dsSupport/Const/ErrorConst" diff --git a/dsSupport/go.mod b/dsSupport/go.mod index c7955465..499c2e9e 100644 --- a/dsSupport/go.mod +++ b/dsSupport/go.mod @@ -27,7 +27,7 @@ require ( github.com/swaggo/gin-swagger v1.2.0 github.com/syndtr/goleveldb v1.0.0 // indirect github.com/tealeg/xlsx v1.0.5 // indirect - github.com/tidwall/gjson v1.6.1 // indirect + github.com/tidwall/gjson v1.6.1 github.com/xormplus/builder v0.0.0-20200331055651-240ff40009be github.com/xormplus/core v0.0.0-20200308074340-f3bce19d5f31 github.com/xormplus/xorm v0.0.0-20200731130200-6811f3bde592 diff --git a/dsSupport/models/t_dw_table.go b/dsSupport/models/t_dw_table.go new file mode 100644 index 00000000..bc01a486 --- /dev/null +++ b/dsSupport/models/t_dw_table.go @@ -0,0 +1,6 @@ +package models + +type TDwTable struct { + TableId int32 `xorm:"not null pk autoincr INT(255)"` + TableName string `xorm:"not null VARCHAR(255)"` +} diff --git a/dsSupport/models/t_dw_table_field.go b/dsSupport/models/t_dw_table_field.go new file mode 100644 index 00000000..7d73919c --- /dev/null +++ b/dsSupport/models/t_dw_table_field.go @@ -0,0 +1,13 @@ +package models + +type TDwTableField struct { + Id int32 `xorm:"not null pk autoincr INT(11)"` + TableId int32 `xorm:"not null comment('哪张表') index INT(11)"` + FieldName string `xorm:"not null comment('字段名称') VARCHAR(255)"` + DataType string `xorm:"not null comment('数据类型') VARCHAR(255)"` + FieldLength int32 `xorm:"not null comment('字段长度') INT(11)"` + DecimalPointLength int32 `xorm:"not null default 0 comment('小数点后长度') INT(11)"` + Comment string `xorm:"default 'NULL' comment('描述') VARCHAR(255)"` + IsPk int32 `xorm:"not null default 0 comment('是否为主键') INT(255)"` + IsNull int32 `xorm:"not null comment('是否可为空') INT(255)"` +} From 80c4058a64814a5ccd7bd727953906d12eb428f5 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 13:41:12 +0800 Subject: [PATCH 04/20] 'commit' --- .../ElasticsearchToGreenPlum.go} | 26 +++---------------- 1 file changed, 3 insertions(+), 23 deletions(-) rename dsSupport/Test/{ElasticsearchToCsv/ReadElasticsearchToCsv.go => ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go} (69%) diff --git a/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go similarity index 69% rename from dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go rename to dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 1e49f83b..0fae34d2 100644 --- a/dsSupport/Test/ElasticsearchToCsv/ReadElasticsearchToCsv.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -3,7 +3,6 @@ package main import ( "context" "dsSupport/Utils/PgUtil" - "encoding/csv" "encoding/json" "fmt" "github.com/olivere/elastic/v7" @@ -67,12 +66,9 @@ func main() { for i := range result.Hits.Hits { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) - jsonStr := string(resByte) - m := make(map[string]interface{}) - json.Unmarshal([]byte(jsonStr), &m) - for k, v := range m { - fmt.Printf("%v: %v\n", k, v) - } + resStr := string(resByte) + value := gjson.Get(resStr, "data_content") + fmt.Println(value.String()) } } else { //没有数据了 @@ -81,19 +77,3 @@ func main() { } } -func SaveToCsv(csvFileName string, _map []map[string]interface{}) { - file, err := os.OpenFile(csvFileName, os.O_CREATE|os.O_RDWR, 0644) - if err != nil { - fmt.Println("open file is failed, err: ", err) - } - defer file.Close() - // 写入UTF-8 BOM,防止中文乱码 - file.WriteString("\xEF\xBB\xBF") - w := csv.NewWriter(file) - //for i := range _map { - // //_map[i] - //} - w.Write([]string{"开发者名称", "开发者邮箱", "应用名称"}) - // 写文件需要flush,不然缓存满了,后面的就写不进去了,只会写一部分 - w.Flush() -} From b81c39cd6b129c511fb524c3d381c8b79d628944 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 13:59:00 +0800 Subject: [PATCH 05/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 43 ++++++++++++++++++- dsSupport/Utils/CommonUtil/CommonUtil.go | 3 ++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 0fae34d2..74476b8c 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -2,6 +2,7 @@ package main import ( "context" + "dsSupport/Utils/CommonUtil" "dsSupport/Utils/PgUtil" "encoding/json" "fmt" @@ -46,7 +47,7 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - fmt.Println(value.String()) + addRecord(indexName, value.String()) } //继续用的 scoll_id scrollId := result.ScrollId @@ -68,7 +69,7 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - fmt.Println(value.String()) + addRecord(indexName, value.String()) } } else { //没有数据了 @@ -76,4 +77,42 @@ func main() { } } } +func addRecord(tableName string, jsonStr string) { + sql := `insert into ` + tableName + "(" + var mymap map[string]interface{} + if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { + for key, _ := range mymap { + sql += key + "," + } + sql = sql[0 : len(sql)-1] + sql += ") values(" + for _, value := range mymap { + switch value.(type) { + case string: + sql += "'" + value.(string) + "'," + break + case int64: + sql += CommonUtil.ConvertInt64ToString(value.(int64)) + "," + break + case float64: + sql += fmt.Sprintf("%d", int64(value.(float64)))+ "," + break + default: + break + } + } + sql = sql[0 : len(sql)-1] + } else { + fmt.Println(err.Error()) + } + + sql += ");" + fmt.Println(sql) + _,err:=db.SQL(sql).Execute() + if err!=nil{ + fmt.Println("成功插入一条数据!") + }else{ + panic(err.Error()) + } +} diff --git a/dsSupport/Utils/CommonUtil/CommonUtil.go b/dsSupport/Utils/CommonUtil/CommonUtil.go index d07f1eac..9858b572 100644 --- a/dsSupport/Utils/CommonUtil/CommonUtil.go +++ b/dsSupport/Utils/CommonUtil/CommonUtil.go @@ -363,6 +363,9 @@ func ConvertIntegerArrayToStringArray(nums []int) []string { func ConvertInt64ToString(int64 int64) string { return strconv.FormatInt(int64, 10) } +func ConvertFloatt64ToString(float64 float64) string { + return strconv.FormatFloat(float64, 'E', -1, 64) +} /** 功能:将int转化为string From 9e95c884b4106360efa768a5b8cf6e4fef5534f6 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 14:20:05 +0800 Subject: [PATCH 06/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 71 ++++++++++++++----- 1 file changed, 52 insertions(+), 19 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 74476b8c..d0156664 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -36,6 +36,23 @@ func main() { //要同步的索引名称,也就是表名称 indexName := "org_school" + + sql := `SELECT + pg_constraint.conname AS pk_name, + pg_attribute.attname AS colname, + pg_type.typname AS typename + FROM + pg_constraint + INNER JOIN pg_class ON pg_constraint.conrelid = pg_class.oid + INNER JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid + AND pg_attribute.attnum = pg_constraint.conkey [ 1 ] + INNER JOIN pg_type ON pg_type.oid = pg_attribute.atttypid + WHERE + pg_class.relname = ? + AND pg_constraint.contype = 'p'` + list, _ := db.SQL(sql, indexName).Query().List() + pk := list[0]["colname"].(string) + //取所有 CTX := context.Background() result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) @@ -47,7 +64,7 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(indexName, value.String()) + addRecord(indexName, pk, value.String()) } //继续用的 scoll_id scrollId := result.ScrollId @@ -69,7 +86,7 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(indexName, value.String()) + addRecord(indexName, pk, value.String()) } } else { //没有数据了 @@ -77,42 +94,58 @@ func main() { } } } -func addRecord(tableName string, jsonStr string) { - sql := `insert into ` + tableName + "(" +func addRecord(tableName string, pk string, jsonStr string) { var mymap map[string]interface{} + var sql = "" if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { - for key, _ := range mymap { - sql += key + "," + //先删除再插入 + sql = `delete from ` + tableName + ` where ` + pk + `=?` + switch (mymap[pk]).(type) { + case string: + _, err := db.SQL(sql, mymap[pk].(string)).Execute() + if err != nil { + panic(err.Error()) + } + break + case float64: + _, err := db.SQL(sql, int64(mymap[pk].(float64))).Execute() + if err != nil { + panic(err.Error()) + } + break + default: + break } - sql = sql[0 : len(sql)-1] - sql += ") values(" - - for _, value := range mymap { + //插入 + sql = `insert into ` + tableName + "(" + var lineSql = "" + for key, value := range mymap { + sql += key + "," switch value.(type) { case string: - sql += "'" + value.(string) + "'," + lineSql += "'" + value.(string) + "'," break case int64: - sql += CommonUtil.ConvertInt64ToString(value.(int64)) + "," + lineSql += CommonUtil.ConvertInt64ToString(value.(int64)) + "," break case float64: - sql += fmt.Sprintf("%d", int64(value.(float64)))+ "," + lineSql += fmt.Sprintf("%d", int64(value.(float64))) + "," break default: break } } + lineSql = lineSql[0 : len(lineSql)-1] sql = sql[0 : len(sql)-1] + sql += ") values(" + lineSql + ");" } else { fmt.Println(err.Error()) } - - sql += ");" fmt.Println(sql) - _,err:=db.SQL(sql).Execute() - if err!=nil{ - fmt.Println("成功插入一条数据!") - }else{ + _, err := db.SQL(sql).Execute() + if err != nil { panic(err.Error()) + } else { + fmt.Println("成功插入一条数据!") } } From 601b9d0a9d78721656320406f4334ce2149f6b3f Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 14:24:37 +0800 Subject: [PATCH 07/20] 'commit' --- .../Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index d0156664..39c7c8c4 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -94,6 +94,7 @@ func main() { } } } +//后续优化为到达一定的阀值再提交一次,初步定为200一次 func addRecord(tableName string, pk string, jsonStr string) { var mymap map[string]interface{} var sql = "" From 7b4b7049f4c44bc8aec5529c463950d7aaef2f7c Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 14:27:30 +0800 Subject: [PATCH 08/20] 'commit' --- .../Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 39c7c8c4..cc71ac80 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -14,6 +14,9 @@ import ( ) // 深度分页 +// TODO +// 后续需要记录每个INDEX的最后同步时间,形成日志文件,现在是一次性全量同步 +// 因目前是以业务主键ID保持,所以比ES的数据量肯定是要少的,因为更新的日志记录都没有同步到数仓,只是最终有效的业务ID同步过来。 // https://www.cnblogs.com/hello-shf/p/11543453.html var client elastic.Client From 036bd31a331a675e34fe5a5705effba31254b7ea Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 16:58:05 +0800 Subject: [PATCH 09/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index cc71ac80..d24999f5 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -21,6 +21,8 @@ import ( var client elastic.Client var db = PgUtil.Engine +var sqls = make([]string, 0) +var keyMap = make(map[string]int, 0) func main() { var host = "http://10.10.14.188:9200/" @@ -58,7 +60,7 @@ func main() { //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(1000).Do(CTX) if err != nil { panic(err) } @@ -69,16 +71,22 @@ func main() { value := gjson.Get(resStr, "data_content") addRecord(indexName, pk, value.String()) } + //批量执行 + commitTran(sqls) + //继续用的 scoll_id scrollId := result.ScrollId //第一次命中的个数 var nowCount = int64(len(result.Hits.Hits)) + //总的命中个数 allCount := result.TotalHits() //开始循环 for { + //清空 + sqls = sqls[0:0] //如果还有数据没有获取 if allCount > nowCount { result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) @@ -95,27 +103,24 @@ func main() { //没有数据了 break } + //批量执行 + commitTran(sqls) } } + //后续优化为到达一定的阀值再提交一次,初步定为200一次 func addRecord(tableName string, pk string, jsonStr string) { var mymap map[string]interface{} var sql = "" if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { //先删除再插入 - sql = `delete from ` + tableName + ` where ` + pk + `=?` + sql = `delete from ` + tableName + ` where ` + pk + `=` switch (mymap[pk]).(type) { case string: - _, err := db.SQL(sql, mymap[pk].(string)).Execute() - if err != nil { - panic(err.Error()) - } + sqls = append(sqls, sql+"'"+mymap[pk].(string)+"';") break case float64: - _, err := db.SQL(sql, int64(mymap[pk].(float64))).Execute() - if err != nil { - panic(err.Error()) - } + sqls = append(sqls, sql+CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))+";") break default: break @@ -145,11 +150,24 @@ func addRecord(tableName string, pk string, jsonStr string) { } else { fmt.Println(err.Error()) } - fmt.Println(sql) - _, err := db.SQL(sql).Execute() + sqls = append(sqls, sql) +} + +//提交事务 +func commitTran(sqls []string) { + session := db.NewSession() + defer session.Close() + err := session.Begin() + for i := range sqls { + _, err = session.Insert(sqls[i]) + if err != nil { + fmt.Println(err.Error()) + session.Rollback() + return + } + } + err = session.Commit() if err != nil { - panic(err.Error()) - } else { - fmt.Println("成功插入一条数据!") + return } } From 45d8874b3a968c508b6894a414b47b2af67f3422 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 17:00:08 +0800 Subject: [PATCH 10/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index d24999f5..73ec1f35 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -72,7 +72,7 @@ func main() { addRecord(indexName, pk, value.String()) } //批量执行 - commitTran(sqls) + batchSave(sqls) //继续用的 scoll_id scrollId := result.ScrollId @@ -104,7 +104,7 @@ func main() { break } //批量执行 - commitTran(sqls) + batchSave(sqls) } } @@ -153,21 +153,12 @@ func addRecord(tableName string, pk string, jsonStr string) { sqls = append(sqls, sql) } -//提交事务 -func commitTran(sqls []string) { - session := db.NewSession() - defer session.Close() - err := session.Begin() +//提交 +func batchSave(sqls []string) { for i := range sqls { - _, err = session.Insert(sqls[i]) + _, err := db.SQL(sqls[i]).Execute() if err != nil { - fmt.Println(err.Error()) - session.Rollback() - return + panic(err) } } - err = session.Commit() - if err != nil { - return - } } From 35f9808db1ef90baa3bbb15c13d685b21432421b Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Fri, 4 Sep 2020 21:56:13 +0800 Subject: [PATCH 11/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 41 +++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 73ec1f35..59912bb6 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -10,6 +10,7 @@ import ( "github.com/tidwall/gjson" "log" "os" + "sort" "time" ) @@ -113,6 +114,14 @@ func addRecord(tableName string, pk string, jsonStr string) { var mymap map[string]interface{} var sql = "" if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { + //拿到key + var keys []string + for k := range mymap { + keys = append(keys, k) + } + //对key排序 + sort.Strings(keys) + //先删除再插入 sql = `delete from ` + tableName + ` where ` + pk + `=` switch (mymap[pk]).(type) { @@ -128,17 +137,19 @@ func addRecord(tableName string, pk string, jsonStr string) { //插入 sql = `insert into ` + tableName + "(" var lineSql = "" - for key, value := range mymap { - sql += key + "," - switch value.(type) { + + //根据key从m中拿元素,就是按顺序拿了 + for _, k := range keys { + sql += k + "," + switch mymap[k].(type) { case string: - lineSql += "'" + value.(string) + "'," + lineSql += "'" + mymap[k].(string) + "'," break case int64: - lineSql += CommonUtil.ConvertInt64ToString(value.(int64)) + "," + lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + "," break case float64: - lineSql += fmt.Sprintf("%d", int64(value.(float64))) + "," + lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + "," break default: break @@ -155,10 +166,16 @@ func addRecord(tableName string, pk string, jsonStr string) { //提交 func batchSave(sqls []string) { - for i := range sqls { - _, err := db.SQL(sqls[i]).Execute() - if err != nil { - panic(err) - } - } + fmt.Println(sqls) + //_, err := db.SQL(sqls[0:2]).Execute() + //if err != nil { + // panic(err) + //} + + //for i := range sqls { + // _, err := db.SQL(sqls[i]).Execute() + // if err != nil { + // panic(err) + // } + //} } From 9fd2adaeb3c6d756c72a08f6d6139d1dc2598549 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Sat, 5 Sep 2020 06:51:31 +0800 Subject: [PATCH 12/20] 'commit' --- dsSupport/Test/CreatePgTable/CreatePgTable.go | 35 +++++++++---------- .../ElasticsearchToGreenPlum.go | 2 -- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/dsSupport/Test/CreatePgTable/CreatePgTable.go b/dsSupport/Test/CreatePgTable/CreatePgTable.go index d11a8eec..3fca7c3c 100644 --- a/dsSupport/Test/CreatePgTable/CreatePgTable.go +++ b/dsSupport/Test/CreatePgTable/CreatePgTable.go @@ -25,10 +25,22 @@ func main() { //2、根据表名,获取相应的创建表的信息 sql = `select * from t_dw_table_field where table_id=?` list2, _ := db.SQL(sql, tableId).Query().List() + //手动增加两个字段:uuid+enable_flag + _map := make(map[string]interface{}, 0) + _map["data_type"] = "char" + _map["field_length"] = CommonUtil.ConvertIntToInt64(36) + _map["field_name"] = "uuid" + list2 = append(list2, _map) + + _map2 := make(map[string]interface{}, 0) + _map2["data_type"] = "int" + _map2["field_length"] = CommonUtil.ConvertIntToInt64(4) + _map2["field_name"] = "enable_flag" + list2 = append(list2, _map2) + for i2 := range list2 { dataType := list2[i2]["data_type"].(string) fieldLength := list2[i2]["field_length"].(int64) - isNull := list2[i2]["is_null"].(int64) fieldName := list2[i2]["field_name"].(string) var c = "" if dataType == "int" { @@ -36,31 +48,18 @@ func main() { } else { c = dataType + "(" + CommonUtil.ConvertInt64ToString(fieldLength) + ") " + ` COLLATE "pg_catalog"."default"` } - if isNull == 0 { - c += " NOT NULL," - } else { - c += "," - } + c += "," createTableSql += `"` + fieldName + `" ` + c + "\r\n" } - createTableSql=createTableSql[0:len(createTableSql)-3] + createTableSql = createTableSql[0 : len(createTableSql)-3] createTableSql += `);` + "\r\n" _, err := pgDb.SQL(createTableSql).Execute() if err != nil { fmt.Println(createTableSql) panic(err.Error()) } - //主键有哪些 - var pks = "" - for i2 := range list2 { - fieldName := list2[i2]["field_name"].(string) - isPk := list2[i2]["is_pk"].(int64) - if isPk == 1 { - pks += fieldName + "," - } - } - //去掉最后一个逗号 - pks = pks[0 : len(pks)-1] + //主键 + var pks = "uuid" //添加主键描述 createTableSql = `ALTER TABLE "public"."` + tableName + `" ADD CONSTRAINT "` + tableName + `_dw_pkey" PRIMARY KEY ("` + pks + `");` + "\r\n" _, err = pgDb.SQL(createTableSql).Execute() diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 59912bb6..8cfe6d59 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -77,10 +77,8 @@ func main() { //继续用的 scoll_id scrollId := result.ScrollId - //第一次命中的个数 var nowCount = int64(len(result.Hits.Hits)) - //总的命中个数 allCount := result.TotalHits() From 319590a92edb3fd735a3041dc677d50d6b19dd7c Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Sat, 5 Sep 2020 07:37:43 +0800 Subject: [PATCH 13/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 129 ++++++++++-------- 1 file changed, 70 insertions(+), 59 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 8cfe6d59..3cc41826 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -3,6 +3,7 @@ package main import ( "context" "dsSupport/Utils/CommonUtil" + "dsSupport/Utils/DbUtil" "dsSupport/Utils/PgUtil" "encoding/json" "fmt" @@ -21,8 +22,10 @@ import ( // https://www.cnblogs.com/hello-shf/p/11543453.html var client elastic.Client -var db = PgUtil.Engine -var sqls = make([]string, 0) +var mysqlDb = DbUtil.Engine +var pgDb = PgUtil.Engine +var insertStrArray = make([]string, 0) +var pkStrArray = make([]string, 0) var keyMap = make(map[string]int, 0) func main() { @@ -42,26 +45,16 @@ func main() { //要同步的索引名称,也就是表名称 indexName := "org_school" - - sql := `SELECT - pg_constraint.conname AS pk_name, - pg_attribute.attname AS colname, - pg_type.typname AS typename - FROM - pg_constraint - INNER JOIN pg_class ON pg_constraint.conrelid = pg_class.oid - INNER JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid - AND pg_attribute.attnum = pg_constraint.conkey [ 1 ] - INNER JOIN pg_type ON pg_type.oid = pg_attribute.atttypid - WHERE - pg_class.relname = ? - AND pg_constraint.contype = 'p'` - list, _ := db.SQL(sql, indexName).Query().List() - pk := list[0]["colname"].(string) - + //从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持 + sql := "select * from t_dw_table where table_name=?" + list, _ := mysqlDb.SQL(sql, indexName).Query().List() + tableId := list[0]["table_id"].(int64) + sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1` + list, _ = mysqlDb.SQL(sql, tableId).Query().List() + pk := list[0]["field_name"].(string) //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(1000).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(100).Do(CTX) if err != nil { panic(err) } @@ -70,10 +63,10 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(indexName, pk, value.String()) + addRecord(pk, value.String()) } //批量执行 - batchSave(sqls) + batchSave(indexName, pk) //继续用的 scoll_id scrollId := result.ScrollId @@ -85,7 +78,8 @@ func main() { //开始循环 for { //清空 - sqls = sqls[0:0] + insertStrArray = insertStrArray[0:0] + pkStrArray = pkStrArray[0:0] //如果还有数据没有获取 if allCount > nowCount { result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) @@ -96,49 +90,43 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(indexName, pk, value.String()) + addRecord(pk, value.String()) } } else { //没有数据了 break } //批量执行 - batchSave(sqls) + batchSave(indexName, pk) } } //后续优化为到达一定的阀值再提交一次,初步定为200一次 -func addRecord(tableName string, pk string, jsonStr string) { +func addRecord(pk string, jsonStr string) { var mymap map[string]interface{} - var sql = "" if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { - //拿到key - var keys []string - for k := range mymap { - keys = append(keys, k) - } - //对key排序 - sort.Strings(keys) - - //先删除再插入 - sql = `delete from ` + tableName + ` where ` + pk + `=` + //组装pkArray,用来update enable_flag switch (mymap[pk]).(type) { case string: - sqls = append(sqls, sql+"'"+mymap[pk].(string)+"';") + pkStrArray = append(pkStrArray, mymap[pk].(string)) break case float64: - sqls = append(sqls, sql+CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))+";") + pkStrArray = append(pkStrArray, CommonUtil.ConvertInt64ToString(int64(mymap[pk].(float64)))) break default: break } - //插入 - sql = `insert into ` + tableName + "(" + //另一个任务是组装insertStrArray,准备批量提交 + //拿到key + var keys []string + for k := range mymap { + keys = append(keys, k) + } + //对key排序 + sort.Strings(keys) var lineSql = "" - //根据key从m中拿元素,就是按顺序拿了 for _, k := range keys { - sql += k + "," switch mymap[k].(type) { case string: lineSql += "'" + mymap[k].(string) + "'," @@ -154,26 +142,49 @@ func addRecord(tableName string, pk string, jsonStr string) { } } lineSql = lineSql[0 : len(lineSql)-1] - sql = sql[0 : len(sql)-1] - sql += ") values(" + lineSql + ");" + insertStrArray = append(insertStrArray, lineSql) } else { fmt.Println(err.Error()) } - sqls = append(sqls, sql) } //提交 -func batchSave(sqls []string) { - fmt.Println(sqls) - //_, err := db.SQL(sqls[0:2]).Execute() - //if err != nil { - // panic(err) - //} - - //for i := range sqls { - // _, err := db.SQL(sqls[i]).Execute() - // if err != nil { - // panic(err) - // } - //} +func batchSave(tableName string, pkName string) { + //fmt.Println(pkStrArray) + //fmt.Println(insertStrArray) + var pkStr = "" + for i := range pkStrArray { + pkStr += "'" + pkStrArray[i] + "'," + } + pkStr = pkStr[0 : len(pkStr)-1] + sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr+")" + pgDb.SQL(sql).Execute() + /* + //插入 + sql = `insert into ` + tableName + "(" + var lineSql = "" + + //根据key从m中拿元素,就是按顺序拿了 + for _, k := range keys { + sql += k + "," + switch mymap[k].(type) { + case string: + lineSql += "'" + mymap[k].(string) + "'," + break + case int64: + lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + "," + break + case float64: + lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + "," + break + default: + break + } + } + lineSql = lineSql[0 : len(lineSql)-1] + sql = sql[0 : len(sql)-1] + sql += ") values(" + lineSql + ");" + } else { + fmt.Println(err.Error()) + */ } From 166766c920438c5bcb41f61b5a871d5d43762c62 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Sat, 5 Sep 2020 08:36:54 +0800 Subject: [PATCH 14/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 67 ++++++++----------- 1 file changed, 28 insertions(+), 39 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 3cc41826..c676b6dd 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -45,6 +45,7 @@ func main() { //要同步的索引名称,也就是表名称 indexName := "org_school" + keys := make([]string, 0) //从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持 sql := "select * from t_dw_table where table_name=?" list, _ := mysqlDb.SQL(sql, indexName).Query().List() @@ -54,7 +55,7 @@ func main() { pk := list[0]["field_name"].(string) //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(100).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX) if err != nil { panic(err) } @@ -63,10 +64,10 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(pk, value.String()) + keys = addRecord(pk, value.String()) } //批量执行 - batchSave(indexName, pk) + batchSave(indexName, pk, keys) //继续用的 scoll_id scrollId := result.ScrollId @@ -90,20 +91,21 @@ func main() { resByte, _ := json.Marshal(result.Hits.Hits[i].Source) resStr := string(resByte) value := gjson.Get(resStr, "data_content") - addRecord(pk, value.String()) + keys = addRecord(pk, value.String()) } } else { //没有数据了 break } //批量执行 - batchSave(indexName, pk) + batchSave(indexName, pk, keys) } } //后续优化为到达一定的阀值再提交一次,初步定为200一次 -func addRecord(pk string, jsonStr string) { +func addRecord(pk string, jsonStr string) []string { var mymap map[string]interface{} + var keys []string if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { //组装pkArray,用来update enable_flag switch (mymap[pk]).(type) { @@ -118,7 +120,6 @@ func addRecord(pk string, jsonStr string) { } //另一个任务是组装insertStrArray,准备批量提交 //拿到key - var keys []string for k := range mymap { keys = append(keys, k) } @@ -141,50 +142,38 @@ func addRecord(pk string, jsonStr string) { break } } - lineSql = lineSql[0 : len(lineSql)-1] + lineSql+="'"+CommonUtil.GetUUID()+"'" insertStrArray = append(insertStrArray, lineSql) } else { fmt.Println(err.Error()) } + return keys } //提交 -func batchSave(tableName string, pkName string) { - //fmt.Println(pkStrArray) - //fmt.Println(insertStrArray) +func batchSave(tableName string, pkName string, keys []string) { var pkStr = "" for i := range pkStrArray { pkStr += "'" + pkStrArray[i] + "'," } pkStr = pkStr[0 : len(pkStr)-1] - sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr+")" + sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr + ")" pgDb.SQL(sql).Execute() - /* - //插入 - sql = `insert into ` + tableName + "(" - var lineSql = "" - //根据key从m中拿元素,就是按顺序拿了 - for _, k := range keys { - sql += k + "," - switch mymap[k].(type) { - case string: - lineSql += "'" + mymap[k].(string) + "'," - break - case int64: - lineSql += CommonUtil.ConvertInt64ToString(mymap[k].(int64)) + "," - break - case float64: - lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + "," - break - default: - break - } - } - lineSql = lineSql[0 : len(lineSql)-1] - sql = sql[0 : len(sql)-1] - sql += ") values(" + lineSql + ");" - } else { - fmt.Println(err.Error()) - */ + //插入 + sql = `insert into ` + tableName + "(" + //根据key从m中拿元素,就是按顺序拿了 + for _, k := range keys { + sql += k + "," + } + sql+="uuid" + + var lineSql = "" + for i := range insertStrArray { + lineSql += "(" + insertStrArray[i] + ")," + } + lineSql = lineSql[0 : len(lineSql)-1] + sql += ") values " + lineSql + ";" + pgDb.SQL(sql).Execute() + fmt.Println("批量执行"+ CommonUtil.ConvertIntToString(len(insertStrArray))+"条.") } From 9bc168f806ea96a8236fe83e5b0de5a88f7abe4e Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Sat, 5 Sep 2020 08:38:14 +0800 Subject: [PATCH 15/20] 'commit' --- .../Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index c676b6dd..30c70941 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -55,7 +55,7 @@ func main() { pk := list[0]["field_name"].(string) //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(1000).Do(CTX) if err != nil { panic(err) } From 14ba2fcfb9424266dbdb3082d3d18a964f863582 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Sat, 5 Sep 2020 09:03:25 +0800 Subject: [PATCH 16/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 18 +++++++++++------ dsSupport/Utils/CommonUtil/CommonUtil.go | 20 +++++++++++++++++-- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 30c70941..8a82c369 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -55,7 +55,7 @@ func main() { pk := list[0]["field_name"].(string) //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(1000).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX) if err != nil { panic(err) } @@ -142,7 +142,7 @@ func addRecord(pk string, jsonStr string) []string { break } } - lineSql+="'"+CommonUtil.GetUUID()+"'" + lineSql += "'" + CommonUtil.GetUUID() + "'" insertStrArray = append(insertStrArray, lineSql) } else { fmt.Println(err.Error()) @@ -153,11 +153,13 @@ func addRecord(pk string, jsonStr string) []string { //提交 func batchSave(tableName string, pkName string, keys []string) { var pkStr = "" + //数组去重 + pkStrArray = CommonUtil.RemoveRepeatedElement(pkStrArray) for i := range pkStrArray { pkStr += "'" + pkStrArray[i] + "'," } pkStr = pkStr[0 : len(pkStr)-1] - sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr + ")" + sql := `update ` + tableName + " set enable_flag=0 where " + pkName + " in (" + pkStr + ") and enable_flag=1" pgDb.SQL(sql).Execute() //插入 @@ -166,7 +168,7 @@ func batchSave(tableName string, pkName string, keys []string) { for _, k := range keys { sql += k + "," } - sql+="uuid" + sql += "uuid" var lineSql = "" for i := range insertStrArray { @@ -174,6 +176,10 @@ func batchSave(tableName string, pkName string, keys []string) { } lineSql = lineSql[0 : len(lineSql)-1] sql += ") values " + lineSql + ";" - pgDb.SQL(sql).Execute() - fmt.Println("批量执行"+ CommonUtil.ConvertIntToString(len(insertStrArray))+"条.") + _,err:=pgDb.SQL(sql).Execute() + if err!=nil{ + fmt.Println(sql) + //panic(err) + } + fmt.Println("批量执行" + CommonUtil.ConvertIntToString(len(insertStrArray)) + "条.") } diff --git a/dsSupport/Utils/CommonUtil/CommonUtil.go b/dsSupport/Utils/CommonUtil/CommonUtil.go index 9858b572..fa7669c7 100644 --- a/dsSupport/Utils/CommonUtil/CommonUtil.go +++ b/dsSupport/Utils/CommonUtil/CommonUtil.go @@ -363,8 +363,24 @@ func ConvertIntegerArrayToStringArray(nums []int) []string { func ConvertInt64ToString(int64 int64) string { return strconv.FormatInt(int64, 10) } -func ConvertFloatt64ToString(float64 float64) string { - return strconv.FormatFloat(float64, 'E', -1, 64) +/** +功能:数组去重 + */ +func RemoveRepeatedElement(arr []string) (newArr []string) { + newArr = make([]string, 0) + for i := 0; i < len(arr); i++ { + repeat := false + for j := i + 1; j < len(arr); j++ { + if arr[i] == arr[j] { + repeat = true + break + } + } + if !repeat { + newArr = append(newArr, arr[i]) + } + } + return } /** From 52fa055c3509b6d21fa93c17cf5134531b19a815 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Mon, 7 Sep 2020 08:22:28 +0800 Subject: [PATCH 17/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 175 ++++++++++-------- 1 file changed, 100 insertions(+), 75 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 8a82c369..e6200541 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -11,24 +11,35 @@ import ( "github.com/tidwall/gjson" "log" "os" - "sort" "time" ) // 深度分页 -// TODO -// 后续需要记录每个INDEX的最后同步时间,形成日志文件,现在是一次性全量同步 -// 因目前是以业务主键ID保持,所以比ES的数据量肯定是要少的,因为更新的日志记录都没有同步到数仓,只是最终有效的业务ID同步过来。 // https://www.cnblogs.com/hello-shf/p/11543453.html -var client elastic.Client +//MYSQL数据连接器,用于读取元数据信息 var mysqlDb = DbUtil.Engine + +//GreenPlum数据连接器,用于写入、修改GPDB的数据 var pgDb = PgUtil.Engine + +//用于批量一次性插入的数据集合 var insertStrArray = make([]string, 0) + +//用于批量更新的数据集合,一般是业务主键ID集合,修改enable_falg=0 var pkStrArray = make([]string, 0) -var keyMap = make(map[string]int, 0) + +//缓存表的结构 +type CacheFieldStruct struct { + cacheFieldKeys []string + tableId int +} + +var CacheFieldBean CacheFieldStruct func main() { + //TODO + //需要将ES的连接信息修改到配置文件中,目前想来应该是数据交换平台的概念,需要在dsDataex项目中实现,待迁移 var host = "http://10.10.14.188:9200/" esClient, err := elastic.NewClient( elastic.SetURL(host), @@ -38,74 +49,94 @@ func main() { elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), ) - if err != nil { + //如果是集成到dsDataex项目中,以协程方式启动,这里就不有使用painc(err)会异常会导致程序退出 panic(err) } - - //要同步的索引名称,也就是表名称 - indexName := "org_school" - keys := make([]string, 0) //从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持 - sql := "select * from t_dw_table where table_name=?" - list, _ := mysqlDb.SQL(sql, indexName).Query().List() - tableId := list[0]["table_id"].(int64) - sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1` - list, _ = mysqlDb.SQL(sql, tableId).Query().List() - pk := list[0]["field_name"].(string) - //取所有 - CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX) - if err != nil { - panic(err) - } - //第一次的结果集 - for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i].Source) - resStr := string(resByte) - value := gjson.Get(resStr, "data_content") - keys = addRecord(pk, value.String()) - } - //批量执行 - batchSave(indexName, pk, keys) - - //继续用的 scoll_id - scrollId := result.ScrollId - //第一次命中的个数 - var nowCount = int64(len(result.Hits.Hits)) - //总的命中个数 - allCount := result.TotalHits() - - //开始循环 - for { - //清空 - insertStrArray = insertStrArray[0:0] - pkStrArray = pkStrArray[0:0] - //如果还有数据没有获取 - if allCount > nowCount { - result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) - scrollId = result.ScrollId - nowCount += int64(len(result.Hits.Hits)) - - for i := range result.Hits.Hits { - resByte, _ := json.Marshal(result.Hits.Hits[i].Source) - resStr := string(resByte) - value := gjson.Get(resStr, "data_content") - keys = addRecord(pk, value.String()) - } - } else { - //没有数据了 - break + sql := "select * from t_dw_table" + list, _ := mysqlDb.SQL(sql).Query().List() + for i := range list { + tableId := list[i]["table_id"].(int64) + indexName := list[i]["table_name"].(string) + //预热数据表的列名 + getFields(int(tableId)) + // 当前表的主键是什么,目前只支持单业务主键,复合主键的不支持 + sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1` + list, _ = mysqlDb.SQL(sql, tableId).Query().List() + pk := list[0]["field_name"].(string) + //取所有 + CTX := context.Background() + result, err := esClient.Scroll().Index(indexName).Size(200).Do(CTX) + if err != nil { + panic(err) + } + //第一次的结果集 + for i := range result.Hits.Hits { + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + resStr := string(resByte) + value := gjson.Get(resStr, "data_content") + addRecord(pk, value.String()) } //批量执行 - batchSave(indexName, pk, keys) + batchSave(indexName, pk) + //继续用的 scoll_id + scrollId := result.ScrollId + //第一次命中的个数 + var nowCount = int64(len(result.Hits.Hits)) + //总的命中个数 + allCount := result.TotalHits() + + //开始循环 + for { + //清空 + insertStrArray = insertStrArray[0:0] + pkStrArray = pkStrArray[0:0] + //如果还有数据没有获取 + if allCount > nowCount { + result, err = esClient.Scroll().ScrollId(scrollId).Do(CTX) + scrollId = result.ScrollId + nowCount += int64(len(result.Hits.Hits)) + + for i := range result.Hits.Hits { + resByte, _ := json.Marshal(result.Hits.Hits[i].Source) + resStr := string(resByte) + value := gjson.Get(resStr, "data_content") + addRecord(pk, value.String()) + } + } else { + //没有数据了 + break + } + //批量执行 + batchSave(indexName, pk) + } + fmt.Println("索引"+indexName+"同步完成!") } } +/** +功能:获取指定表的字段有哪些,应该是一个稳定的排序,因为后面要用到 +作者:黄海 +时间:2020-09-07 +*/ +func getFields(tableId int) { + //清空 + CacheFieldBean.cacheFieldKeys = CacheFieldBean.cacheFieldKeys[:] + //填充 + sql := `select field_name from t_dw_table_field where table_id=? order by sort_id,field_name` + list, _ := mysqlDb.SQL(sql, tableId).Query().List() + + for i := range list { + fieldName := list[i]["field_name"].(string) + CacheFieldBean.cacheFieldKeys = append(CacheFieldBean.cacheFieldKeys, fieldName) + } + CacheFieldBean.tableId = tableId +} + //后续优化为到达一定的阀值再提交一次,初步定为200一次 -func addRecord(pk string, jsonStr string) []string { +func addRecord(pk string, jsonStr string) { var mymap map[string]interface{} - var keys []string if err := json.Unmarshal([]byte(jsonStr), &mymap); err == nil { //组装pkArray,用来update enable_flag switch (mymap[pk]).(type) { @@ -119,15 +150,9 @@ func addRecord(pk string, jsonStr string) []string { break } //另一个任务是组装insertStrArray,准备批量提交 - //拿到key - for k := range mymap { - keys = append(keys, k) - } - //对key排序 - sort.Strings(keys) var lineSql = "" //根据key从m中拿元素,就是按顺序拿了 - for _, k := range keys { + for _, k := range CacheFieldBean.cacheFieldKeys { switch mymap[k].(type) { case string: lineSql += "'" + mymap[k].(string) + "'," @@ -139,6 +164,7 @@ func addRecord(pk string, jsonStr string) []string { lineSql += fmt.Sprintf("%d", int64(mymap[k].(float64))) + "," break default: + lineSql += "'-1'," //进入这里一般是有这个字段,但数据源中没有这列数据,按-1填充吧~ break } } @@ -147,11 +173,10 @@ func addRecord(pk string, jsonStr string) []string { } else { fmt.Println(err.Error()) } - return keys } //提交 -func batchSave(tableName string, pkName string, keys []string) { +func batchSave(tableName string, pkName string) { var pkStr = "" //数组去重 pkStrArray = CommonUtil.RemoveRepeatedElement(pkStrArray) @@ -165,7 +190,7 @@ func batchSave(tableName string, pkName string, keys []string) { //插入 sql = `insert into ` + tableName + "(" //根据key从m中拿元素,就是按顺序拿了 - for _, k := range keys { + for _, k := range CacheFieldBean.cacheFieldKeys { sql += k + "," } sql += "uuid" @@ -176,8 +201,8 @@ func batchSave(tableName string, pkName string, keys []string) { } lineSql = lineSql[0 : len(lineSql)-1] sql += ") values " + lineSql + ";" - _,err:=pgDb.SQL(sql).Execute() - if err!=nil{ + _, err := pgDb.SQL(sql).Execute() + if err != nil { fmt.Println(sql) //panic(err) } From 4e99ffb827540ac02568930968ca0925f185aad7 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Mon, 7 Sep 2020 08:43:04 +0800 Subject: [PATCH 18/20] 'commit' --- dsSupport/Test/CreatePgTable/CreatePgTable.go | 4 ++-- .../ElasticsearchToGreenPlum.go | 14 +++++++------- .../Test/ReadEsExistMapping/ReadEsExistMapping.go | 7 +++++-- dsSupport/models/t_dw_table_field.go | 1 - 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/dsSupport/Test/CreatePgTable/CreatePgTable.go b/dsSupport/Test/CreatePgTable/CreatePgTable.go index 3fca7c3c..f6618538 100644 --- a/dsSupport/Test/CreatePgTable/CreatePgTable.go +++ b/dsSupport/Test/CreatePgTable/CreatePgTable.go @@ -54,8 +54,8 @@ func main() { createTableSql = createTableSql[0 : len(createTableSql)-3] createTableSql += `);` + "\r\n" _, err := pgDb.SQL(createTableSql).Execute() + fmt.Println(createTableSql) if err != nil { - fmt.Println(createTableSql) panic(err.Error()) } //主键 @@ -66,7 +66,7 @@ func main() { if err != nil { panic(err.Error()) } - + //bzlbm fmt.Println("恭喜,数据仓库中相应表格创建完毕!") } } diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index e6200541..8cd89d38 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -54,16 +54,16 @@ func main() { panic(err) } //从配置表中获取此索引对应的表中主键是哪个,目前只支持一个主键,无主键和多主键不支持 - sql := "select * from t_dw_table" - list, _ := mysqlDb.SQL(sql).Query().List() - for i := range list { - tableId := list[i]["table_id"].(int64) - indexName := list[i]["table_name"].(string) + sql := "select * from t_dw_table where b_use=1" + tableList, _ := mysqlDb.SQL(sql).Query().List() + for i := range tableList { + tableId := tableList[i]["table_id"].(int64) + indexName := tableList[i]["table_name"].(string) //预热数据表的列名 getFields(int(tableId)) // 当前表的主键是什么,目前只支持单业务主键,复合主键的不支持 sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1` - list, _ = mysqlDb.SQL(sql, tableId).Query().List() + list, _ := mysqlDb.SQL(sql, tableId).Query().List() pk := list[0]["field_name"].(string) //取所有 CTX := context.Background() @@ -204,7 +204,7 @@ func batchSave(tableName string, pkName string) { _, err := pgDb.SQL(sql).Execute() if err != nil { fmt.Println(sql) - //panic(err) + panic(err) } fmt.Println("批量执行" + CommonUtil.ConvertIntToString(len(insertStrArray)) + "条.") } diff --git a/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go b/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go index 7d6d9054..f28fb737 100644 --- a/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go +++ b/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go @@ -35,7 +35,7 @@ func main() { } //要同步的索引名称,也就是表名称 - indexName := "org_school" + indexName := "user_teacher_org" //取所有 CTX := context.Background() result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) @@ -83,7 +83,10 @@ func addFieldData(tableName string, m map[string]interface{}) { model.FieldLength = 128 model.DecimalPointLength = 0 } - db.Insert(model) + _,err:=db.Insert(model) + if err!=nil{ + panic(err) + } } fmt.Println("完成现有Es Mapping的结构反向初始化工作,将手工修改表t_dw_table_field中数据!尤其是主键,一定要设置啊!") } diff --git a/dsSupport/models/t_dw_table_field.go b/dsSupport/models/t_dw_table_field.go index 7d73919c..a6b485a3 100644 --- a/dsSupport/models/t_dw_table_field.go +++ b/dsSupport/models/t_dw_table_field.go @@ -9,5 +9,4 @@ type TDwTableField struct { DecimalPointLength int32 `xorm:"not null default 0 comment('小数点后长度') INT(11)"` Comment string `xorm:"default 'NULL' comment('描述') VARCHAR(255)"` IsPk int32 `xorm:"not null default 0 comment('是否为主键') INT(255)"` - IsNull int32 `xorm:"not null comment('是否可为空') INT(255)"` } From ad926ce29464dfc77cbfd7a837dd47a90ec58af6 Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Mon, 7 Sep 2020 08:48:31 +0800 Subject: [PATCH 19/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index 8cd89d38..e4713f72 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -30,12 +30,7 @@ var insertStrArray = make([]string, 0) var pkStrArray = make([]string, 0) //缓存表的结构 -type CacheFieldStruct struct { - cacheFieldKeys []string - tableId int -} - -var CacheFieldBean CacheFieldStruct +var cacheFieldKeys []string func main() { //TODO @@ -61,6 +56,10 @@ func main() { indexName := tableList[i]["table_name"].(string) //预热数据表的列名 getFields(int(tableId)) + //清空重新导入 + sql = "truncate table " + indexName + pgDb.SQL(sql).Execute() + // 当前表的主键是什么,目前只支持单业务主键,复合主键的不支持 sql = `select field_name from t_dw_table_field where table_id=? and is_pk=1` list, _ := mysqlDb.SQL(sql, tableId).Query().List() @@ -111,7 +110,7 @@ func main() { //批量执行 batchSave(indexName, pk) } - fmt.Println("索引"+indexName+"同步完成!") + fmt.Println("索引" + indexName + "同步完成!") } } @@ -122,16 +121,15 @@ func main() { */ func getFields(tableId int) { //清空 - CacheFieldBean.cacheFieldKeys = CacheFieldBean.cacheFieldKeys[:] + cacheFieldKeys = cacheFieldKeys[0:0] //填充 sql := `select field_name from t_dw_table_field where table_id=? order by sort_id,field_name` list, _ := mysqlDb.SQL(sql, tableId).Query().List() for i := range list { fieldName := list[i]["field_name"].(string) - CacheFieldBean.cacheFieldKeys = append(CacheFieldBean.cacheFieldKeys, fieldName) + cacheFieldKeys = append(cacheFieldKeys, fieldName) } - CacheFieldBean.tableId = tableId } //后续优化为到达一定的阀值再提交一次,初步定为200一次 @@ -152,7 +150,7 @@ func addRecord(pk string, jsonStr string) { //另一个任务是组装insertStrArray,准备批量提交 var lineSql = "" //根据key从m中拿元素,就是按顺序拿了 - for _, k := range CacheFieldBean.cacheFieldKeys { + for _, k := range cacheFieldKeys { switch mymap[k].(type) { case string: lineSql += "'" + mymap[k].(string) + "'," @@ -190,7 +188,7 @@ func batchSave(tableName string, pkName string) { //插入 sql = `insert into ` + tableName + "(" //根据key从m中拿元素,就是按顺序拿了 - for _, k := range CacheFieldBean.cacheFieldKeys { + for _, k := range cacheFieldKeys { sql += k + "," } sql += "uuid" From 424bb857ca89a6e8ad1ea0c0dae48b4f5e98e16f Mon Sep 17 00:00:00 2001 From: huanghai <10402852@qq.com> Date: Mon, 7 Sep 2020 08:58:20 +0800 Subject: [PATCH 20/20] 'commit' --- .../ElasticsearchToGreenPlum.go | 2 +- .../Test/ReadEsExistMapping/ReadEsExistMapping.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go index e4713f72..a349a34f 100644 --- a/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go +++ b/dsSupport/Test/ElasticsearchToGreenPlum/ElasticsearchToGreenPlum.go @@ -204,5 +204,5 @@ func batchSave(tableName string, pkName string) { fmt.Println(sql) panic(err) } - fmt.Println("批量执行" + CommonUtil.ConvertIntToString(len(insertStrArray)) + "条.") + fmt.Println(tableName+"批量执行" + CommonUtil.ConvertIntToString(len(insertStrArray)) + "条.") } diff --git a/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go b/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go index f28fb737..32884300 100644 --- a/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go +++ b/dsSupport/Test/ReadEsExistMapping/ReadEsExistMapping.go @@ -35,10 +35,10 @@ func main() { } //要同步的索引名称,也就是表名称 - indexName := "user_teacher_org" + indexName := "user_student" //取所有 CTX := context.Background() - result, err := esClient.Scroll().Index(indexName).Size(10).Do(CTX) + result, err := esClient.Scroll().Index(indexName).Size(100).Do(CTX) if err != nil { panic(err) } @@ -50,7 +50,7 @@ func main() { //如果是第一次的话而且表不存在的情况下,重新创建表结构 m := make(map[string]interface{}) json.Unmarshal([]byte(value.String()), &m) - if len(m) > 0 && i == 0 { + if len(m) > 0 && i == len(result.Hits.Hits)-1 { addFieldData(indexName, m) break } @@ -83,8 +83,8 @@ func addFieldData(tableName string, m map[string]interface{}) { model.FieldLength = 128 model.DecimalPointLength = 0 } - _,err:=db.Insert(model) - if err!=nil{ + _, err := db.Insert(model) + if err != nil { panic(err) } }