From 3a786c698747d6c48339f4f61752f23bf39e0550 Mon Sep 17 00:00:00 2001 From: zhangjun <53766543@qq.com> Date: Mon, 27 Jul 2020 09:07:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20DataCollect=20=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dsDataex/Config/Config.ini | 4 +- .../DataEX/DataexOpenapi/DataexOpenapi.go | 8 +- dsDataex/MyService/MySwagger/DataCollect.go | 12 +++ dsDataex/MyService/MySwagger/DataPage.go | 2 +- dsDataex/MyService/MySwagger/Event.go | 13 +++ dsDataex/MyService/MySwagger/User.go | 10 +++ dsDataex/Utils/ConfigUtil/ConfigUtil.go | 15 +++- dsDataex/Utils/KafkaUtil/KafkaUtil.go | 38 +++++--- dsDataex/docs/docs.go | 90 ++++++++++++++++++- dsDataex/docs/swagger.json | 90 ++++++++++++++++++- dsDataex/docs/swagger.yaml | 67 +++++++++++++- dsDataex/main.go | 12 ++- 12 files changed, 324 insertions(+), 37 deletions(-) create mode 100644 dsDataex/MyService/MySwagger/DataCollect.go create mode 100644 dsDataex/MyService/MySwagger/Event.go create mode 100644 dsDataex/MyService/MySwagger/User.go diff --git a/dsDataex/Config/Config.ini b/dsDataex/Config/Config.ini index a3317518..7cca1287 100644 --- a/dsDataex/Config/Config.ini +++ b/dsDataex/Config/Config.ini @@ -25,7 +25,9 @@ expireTime = 86400 [kafka] -brokers = 192.168.0.200:9092,192.168.0.200:9091 +brokers = 10.10.14.238:9092, +;brokers = 192.168.0.200:9092,192.168.0.200:9091 +replication = 1 diff --git a/dsDataex/MyService/DataEX/DataexOpenapi/DataexOpenapi.go b/dsDataex/MyService/DataEX/DataexOpenapi/DataexOpenapi.go index 5a90486d..76e458bf 100644 --- a/dsDataex/MyService/DataEX/DataexOpenapi/DataexOpenapi.go +++ b/dsDataex/MyService/DataEX/DataexOpenapi/DataexOpenapi.go @@ -73,24 +73,24 @@ func DataexSet(c *gin.Context) { } // @Summary 汇集数据 -// @Description 【数据交换平台】数据汇集服务接口,支持批量入库,一个批次少于100条数据。 +// @Description 【数据交换平台】用户行为数据汇集服务接口,支持用户事件批量入库,一个批次少于100个事件数据。 // @Tags dataex // @Accept json // @Produce json -// @Param input body MySwagger.DataIn true "汇集数据" +// @Param input body MySwagger.DataCollect true "汇集数据" // @Success 200 {object} MySwagger.Result // @Failure 400 {object} MySwagger.Result // @Router /v1/dataex/DataexCollect [post] func DataexCollect(c *gin.Context) { - var input MySwagger.DataIn + var input MySwagger.DataCollect if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, MySwagger.Result{Success: false,Message: "接入系统数据JSON格式错误"}) return } - if input.SystemID == "" || input.AuthToken == "" { + if input.AccessToken == "" { c.JSON(http.StatusUnauthorized, MySwagger.Result{Success: false,Message: "接入系统票据验证失败"}) return } diff --git a/dsDataex/MyService/MySwagger/DataCollect.go b/dsDataex/MyService/MySwagger/DataCollect.go new file mode 100644 index 00000000..0ff446ea --- /dev/null +++ b/dsDataex/MyService/MySwagger/DataCollect.go @@ -0,0 +1,12 @@ +package MySwagger + +type DataCollect struct { + + AccessToken string `json:"access_token" example:"system_01##20200102030405##a6ce11eab94df48a6ce11eab" ` + DataSource string `json:"data_source" example:"log_login" ` + AccessIP string `json:"access_ip" example:"10.10.8.88"` + AccessWay string `json:"access_way" example:"app/web/weixin/dingding"` + + UserData User `json:"user_data" ` + EventDatas []Event `json:"event_datas" ` +} \ No newline at end of file diff --git a/dsDataex/MyService/MySwagger/DataPage.go b/dsDataex/MyService/MySwagger/DataPage.go index cd75406f..ba0ca586 100644 --- a/dsDataex/MyService/MySwagger/DataPage.go +++ b/dsDataex/MyService/MySwagger/DataPage.go @@ -6,5 +6,5 @@ type DataPage struct { DataSource string `json:"data_source" example:"ORG"` OrgID string `json:"org_id" example:"200201"` QueryTime string `json:"query_time" example:"2020-01-01 02:03:04"` - QueryPage int `json:"query_page" example:"1"` + QueryPage int `json:"query_page" example:"0"` } \ No newline at end of file diff --git a/dsDataex/MyService/MySwagger/Event.go b/dsDataex/MyService/MySwagger/Event.go new file mode 100644 index 00000000..4ab0efde --- /dev/null +++ b/dsDataex/MyService/MySwagger/Event.go @@ -0,0 +1,13 @@ +package MySwagger + +type Event struct { + + EventType string `json:"event_type" example:"1001"` + EventName string `json:"event_name" example:"在线提交作业"` + EventTime string `json:"event_time" example:"2020-08-08 18:19:20"` + EventURI string `json:"event_uri" example:"https://doc.thinkingdata.cn/user_guide"` + EventSeqNO string `json:"event_seqno" example:"事件顺序号:1/2/3"` + EventUserID string `json:"event_userid" example:"用户ID(可选)"` + + Properties string `json:"properties" example:"{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}"` +} diff --git a/dsDataex/MyService/MySwagger/User.go b/dsDataex/MyService/MySwagger/User.go new file mode 100644 index 00000000..b1b892f5 --- /dev/null +++ b/dsDataex/MyService/MySwagger/User.go @@ -0,0 +1,10 @@ +package MySwagger + +type User struct { + + UserID string `json:"user_id" example:"用户ID"` + Identity string `json:"identity" example:"用户身份(1:教师,2:学生,3:家长,4:管理员,5:访客)"` + AccessID string `json:"access_id" example:"访客ID(可选)"` + + Properties string `json:"properties" example:"{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}"` +} diff --git a/dsDataex/Utils/ConfigUtil/ConfigUtil.go b/dsDataex/Utils/ConfigUtil/ConfigUtil.go index 2366e5e1..68795b92 100644 --- a/dsDataex/Utils/ConfigUtil/ConfigUtil.go +++ b/dsDataex/Utils/ConfigUtil/ConfigUtil.go @@ -43,7 +43,12 @@ var ( ProjectPath string //kafka地址 - KafkaAddress string + //KafkaAddress string + + //Kafka broker地址 + KafkaBrokers []string + + KafkaReply int64 //ES Nodes 地址 ESNodes string @@ -54,6 +59,8 @@ var ( ESPWD string + + //Params 系统参数 ROOT_ORGNAME string ) @@ -110,8 +117,12 @@ func init() { ProjectPort = iniParser.GetString("project", "project_port") ProjectGrpc = iniParser.GetString("project", "project_grpc") + //change by zhangjun 2020-07-24 //kafka地址 - KafkaAddress = iniParser.GetString("kafka", "KafkaAddress") + //KafkaAddress = iniParser.GetString("kafka", "KafkaAddress") + KafkaBrokers=strings.Split(iniParser.GetString("kafka", "brokers"),",") + + KafkaReply =iniParser.GetInt64("kafka", "replication") ESNodes = iniParser.GetString("elasticsearch", "nodes") ESUser = iniParser.GetString("elasticsearch", "user") diff --git a/dsDataex/Utils/KafkaUtil/KafkaUtil.go b/dsDataex/Utils/KafkaUtil/KafkaUtil.go index 1f97c23c..3dc20a0e 100644 --- a/dsDataex/Utils/KafkaUtil/KafkaUtil.go +++ b/dsDataex/Utils/KafkaUtil/KafkaUtil.go @@ -3,22 +3,31 @@ package KafkaUtil import ( "bytes" "context" + "dsDataex/Utils/ConfigUtil" "fmt" "github.com/segmentio/kafka-go" + "strconv" "time" ) var KafkaClient *kafka.Conn - +var KafkaBroker string var CTX context.Context func init() { CTX=context.Background() + + KafkaClient, _ = kafka.DialLeader(CTX , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0) + + brokers,_:= KafkaClient.Brokers() + + KafkaBroker=brokers[0].Host+":"+strconv.Itoa(brokers[0].Port)+"【"+strconv.Itoa(brokers[0].ID)+"】" } func ProvideLow(topic string) { - KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", "192.168.0.200:9092", topic , 0) + + KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) KafkaClient.SetWriteDeadline(time.Now().Add(5 *time.Second)) @@ -31,7 +40,10 @@ func ProvideLow(topic string) { //KafkaClient.Close() } -func ConsumeLow() { +func ConsumeLow(topic string) { + + KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) + KafkaClient.SetReadDeadline(time.Now().Add(5 *time.Second)) batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max @@ -53,13 +65,13 @@ func ConsumeLow() { //KafkaClient.Close() } -func Consume() { +func Consume(topic string,group string) { r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"192.168.0.200:9092"}, - Topic: "log_login", + Brokers: ConfigUtil.KafkaBrokers , + Topic: topic, //Partition: 0, - GroupID: "Group_04",//必须指定 Group,否则需要指定 Partition!!! + GroupID: group,//必须指定 Group,否则需要指定 Partition!!! MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) @@ -77,11 +89,11 @@ func Consume() { r.Close() } -func Provide(){ +func Provide(topic string){ w := kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{"192.168.0.200:9092"}, - Topic: "log_login", + Brokers: ConfigUtil.KafkaBrokers, + Topic: topic, Balancer: &kafka.LeastBytes{},//Partition 自动分配器 }) @@ -103,11 +115,11 @@ func Provide(){ func CreateTopic(topic string){ - KafkaClient, _ = kafka.DialLeader(CTX , "tcp", "10.10.14.238:9092", "__consumer_offsets" , 0) + KafkaClient, _ = kafka.DialLeader(CTX , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0) err:= KafkaClient.CreateTopics(kafka.TopicConfig{ NumPartitions: 8, - ReplicationFactor: 1, + ReplicationFactor: int(ConfigUtil.KafkaReply), Topic: topic, }) @@ -117,7 +129,7 @@ func CreateTopic(topic string){ } func DeleteTopic(topic string){ - KafkaClient, _ = kafka.DialLeader( CTX, "tcp", "10.10.14.238:9092", topic , 0) + KafkaClient, _ = kafka.DialLeader( CTX, "tcp", ConfigUtil.KafkaBrokers[0], topic , 0) err:= KafkaClient.DeleteTopics(topic) diff --git a/dsDataex/docs/docs.go b/dsDataex/docs/docs.go index 987db844..f70be7c7 100644 --- a/dsDataex/docs/docs.go +++ b/dsDataex/docs/docs.go @@ -35,7 +35,7 @@ var doc = `{ "paths": { "/v1/dataex/DataexCollect": { "post": { - "description": "【数据交换平台】数据汇集服务接口,支持批量入库,一个批次少于100条数据。", + "description": "【数据交换平台】用户行为数据汇集服务接口,支持用户事件批量入库,一个批次少于100个事件数据。", "consumes": [ "application/json" ], @@ -53,7 +53,7 @@ var doc = `{ "in": "body", "required": true, "schema": { - "$ref": "#/definitions/MySwagger.DataIn" + "$ref": "#/definitions/MySwagger.DataCollect" } } ], @@ -1447,6 +1447,36 @@ var doc = `{ } } }, + "MySwagger.DataCollect": { + "type": "object", + "properties": { + "access_ip": { + "type": "string", + "example": "10.10.8.88" + }, + "access_token": { + "type": "string", + "example": "system_01##20200102030405##a6ce11eab94df48a6ce11eab" + }, + "access_way": { + "type": "string", + "example": "app/web/weixin/dingding" + }, + "data_source": { + "type": "string", + "example": "log_login" + }, + "event_datas": { + "type": "array", + "items": { + "$ref": "#/definitions/MySwagger.Event" + } + }, + "user_data": { + "$ref": "#/definitions/MySwagger.User" + } + } + }, "MySwagger.DataGet": { "type": "object", "properties": { @@ -1561,7 +1591,7 @@ var doc = `{ }, "query_page": { "type": "integer", - "example": 1 + "example": 0 }, "query_time": { "type": "string", @@ -1711,6 +1741,39 @@ var doc = `{ } } }, + "MySwagger.Event": { + "type": "object", + "properties": { + "event_name": { + "type": "string", + "example": "在线提交作业" + }, + "event_seqno": { + "type": "string", + "example": "事件顺序号:1/2/3" + }, + "event_time": { + "type": "string", + "example": "2020-08-08 18:19:20" + }, + "event_type": { + "type": "string", + "example": "1001" + }, + "event_uri": { + "type": "string", + "example": "https://doc.thinkingdata.cn/user_guide" + }, + "event_userid": { + "type": "string", + "example": "用户ID(可选)" + }, + "properties": { + "type": "string", + "example": "{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}" + } + } + }, "MySwagger.FailResult": { "type": "object", "properties": { @@ -2068,6 +2131,27 @@ var doc = `{ } } }, + "MySwagger.User": { + "type": "object", + "properties": { + "access_id": { + "type": "string", + "example": "访客ID(可选)" + }, + "identity": { + "type": "string", + "example": "用户身份(1:教师,2:学生,3:家长,4:管理员,5:访客)" + }, + "properties": { + "type": "string", + "example": "{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}" + }, + "user_id": { + "type": "string", + "example": "用户ID" + } + } + }, "dsDataex_MyModel_MySwagger.Result": { "type": "object", "properties": { diff --git a/dsDataex/docs/swagger.json b/dsDataex/docs/swagger.json index f5d04451..6438c228 100644 --- a/dsDataex/docs/swagger.json +++ b/dsDataex/docs/swagger.json @@ -20,7 +20,7 @@ "paths": { "/v1/dataex/DataexCollect": { "post": { - "description": "【数据交换平台】数据汇集服务接口,支持批量入库,一个批次少于100条数据。", + "description": "【数据交换平台】用户行为数据汇集服务接口,支持用户事件批量入库,一个批次少于100个事件数据。", "consumes": [ "application/json" ], @@ -38,7 +38,7 @@ "in": "body", "required": true, "schema": { - "$ref": "#/definitions/MySwagger.DataIn" + "$ref": "#/definitions/MySwagger.DataCollect" } } ], @@ -1432,6 +1432,36 @@ } } }, + "MySwagger.DataCollect": { + "type": "object", + "properties": { + "access_ip": { + "type": "string", + "example": "10.10.8.88" + }, + "access_token": { + "type": "string", + "example": "system_01##20200102030405##a6ce11eab94df48a6ce11eab" + }, + "access_way": { + "type": "string", + "example": "app/web/weixin/dingding" + }, + "data_source": { + "type": "string", + "example": "log_login" + }, + "event_datas": { + "type": "array", + "items": { + "$ref": "#/definitions/MySwagger.Event" + } + }, + "user_data": { + "$ref": "#/definitions/MySwagger.User" + } + } + }, "MySwagger.DataGet": { "type": "object", "properties": { @@ -1546,7 +1576,7 @@ }, "query_page": { "type": "integer", - "example": 1 + "example": 0 }, "query_time": { "type": "string", @@ -1696,6 +1726,39 @@ } } }, + "MySwagger.Event": { + "type": "object", + "properties": { + "event_name": { + "type": "string", + "example": "在线提交作业" + }, + "event_seqno": { + "type": "string", + "example": "事件顺序号:1/2/3" + }, + "event_time": { + "type": "string", + "example": "2020-08-08 18:19:20" + }, + "event_type": { + "type": "string", + "example": "1001" + }, + "event_uri": { + "type": "string", + "example": "https://doc.thinkingdata.cn/user_guide" + }, + "event_userid": { + "type": "string", + "example": "用户ID(可选)" + }, + "properties": { + "type": "string", + "example": "{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}" + } + } + }, "MySwagger.FailResult": { "type": "object", "properties": { @@ -2053,6 +2116,27 @@ } } }, + "MySwagger.User": { + "type": "object", + "properties": { + "access_id": { + "type": "string", + "example": "访客ID(可选)" + }, + "identity": { + "type": "string", + "example": "用户身份(1:教师,2:学生,3:家长,4:管理员,5:访客)" + }, + "properties": { + "type": "string", + "example": "{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}" + }, + "user_id": { + "type": "string", + "example": "用户ID" + } + } + }, "dsDataex_MyModel_MySwagger.Result": { "type": "object", "properties": { diff --git a/dsDataex/docs/swagger.yaml b/dsDataex/docs/swagger.yaml index 20a20697..8b8871d4 100644 --- a/dsDataex/docs/swagger.yaml +++ b/dsDataex/docs/swagger.yaml @@ -27,6 +27,27 @@ definitions: example: "200201" type: string type: object + MySwagger.DataCollect: + properties: + access_ip: + example: 10.10.8.88 + type: string + access_token: + example: system_01##20200102030405##a6ce11eab94df48a6ce11eab + type: string + access_way: + example: app/web/weixin/dingding + type: string + data_source: + example: log_login + type: string + event_datas: + items: + $ref: '#/definitions/MySwagger.Event' + type: array + user_data: + $ref: '#/definitions/MySwagger.User' + type: object MySwagger.DataGet: properties: auth_token: @@ -107,7 +128,7 @@ definitions: example: "200201" type: string query_page: - example: 1 + example: 0 type: integer query_time: example: "2020-01-01 02:03:04" @@ -210,6 +231,30 @@ definitions: system_id: type: string type: object + MySwagger.Event: + properties: + event_name: + example: 在线提交作业 + type: string + event_seqno: + example: 事件顺序号:1/2/3 + type: string + event_time: + example: "2020-08-08 18:19:20" + type: string + event_type: + example: "1001" + type: string + event_uri: + example: https://doc.thinkingdata.cn/user_guide + type: string + event_userid: + example: 用户ID(可选) + type: string + properties: + example: '{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}' + type: string + type: object MySwagger.FailResult: properties: fail_id: @@ -452,6 +497,22 @@ definitions: type: string type: array type: object + MySwagger.User: + properties: + access_id: + example: 访客ID(可选) + type: string + identity: + example: 用户身份(1:教师,2:学生,3:家长,4:管理员,5:访客) + type: string + properties: + example: '{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 + 12:13:14}' + type: string + user_id: + example: 用户ID + type: string + type: object dsDataex_MyModel_MySwagger.Result: properties: data: @@ -496,14 +557,14 @@ paths: post: consumes: - application/json - description: 【数据交换平台】数据汇集服务接口,支持批量入库,一个批次少于100条数据。 + description: 【数据交换平台】用户行为数据汇集服务接口,支持用户事件批量入库,一个批次少于100个事件数据。 parameters: - description: 汇集数据 in: body name: input required: true schema: - $ref: '#/definitions/MySwagger.DataIn' + $ref: '#/definitions/MySwagger.DataCollect' produces: - application/json responses: diff --git a/dsDataex/main.go b/dsDataex/main.go index 695a354f..155a9825 100644 --- a/dsDataex/main.go +++ b/dsDataex/main.go @@ -15,8 +15,8 @@ import ( "dsDataex/Utils" "dsDataex/Utils/CacheUtil" "dsDataex/Utils/ConfigUtil" - "dsDataex/Utils/ES7SqlUtil" "dsDataex/Utils/ES7Util" + "dsDataex/Utils/KafkaUtil" _ "dsDataex/docs" "fmt" "github.com/gin-gonic/gin" @@ -111,16 +111,14 @@ func main() { fmt.Println("Dsideal DataEX GO GO GO !!!") fmt.Println("ES Server :" + ES7Util.ServerVersion) - fmt.Println("ES Server :" + ES7SqlUtil.ServerVersion) - - CacheUtil.OrgtreeCacheInit() - - //KafkaUtil.Provide() - //KafkaUtil.Consume() + //fmt.Println("ES Server :" + ES7SqlUtil.ServerVersion) + fmt.Println("Kafka Server :" + KafkaUtil.KafkaBroker ) //KafkaUtil.CreateTopic("log_12345") //KafkaUtil.DeleteTopic("log_12345") + CacheUtil.OrgtreeCacheInit() + GinServerInit() g.Go(func() error {