增加 DataCollect 相关结构

master
zhangjun 5 years ago
parent 74078e3f10
commit 3a786c6987

@ -25,7 +25,9 @@ expireTime = 86400
[kafka]
brokers = 192.168.0.200:9092,192.168.0.200:9091
brokers = 10.10.14.238:9092,
;brokers = 192.168.0.200:9092,192.168.0.200:9091
replication = 1

@ -73,24 +73,24 @@ func DataexSet(c *gin.Context) {
}
// @Summary 汇集数据
// @Description 【数据交换平台】数据汇集服务接口支持批量入库一个批次少于100条数据。
// @Description 【数据交换平台】用户行为数据汇集服务接口支持用户事件批量入库一个批次少于100个事件数据。
// @Tags dataex
// @Accept json
// @Produce json
// @Param input body MySwagger.DataIn true "汇集数据"
// @Param input body MySwagger.DataCollect true "汇集数据"
// @Success 200 {object} MySwagger.Result
// @Failure 400 {object} MySwagger.Result
// @Router /v1/dataex/DataexCollect [post]
func DataexCollect(c *gin.Context) {
var input MySwagger.DataIn
var input MySwagger.DataCollect
if err := c.ShouldBindJSON(&input); err != nil {
c.JSON(http.StatusBadRequest, MySwagger.Result{Success: false,Message: "接入系统数据JSON格式错误"})
return
}
if input.SystemID == "" || input.AuthToken == "" {
if input.AccessToken == "" {
c.JSON(http.StatusUnauthorized, MySwagger.Result{Success: false,Message: "接入系统票据验证失败"})
return
}

@ -0,0 +1,12 @@
package MySwagger
type DataCollect struct {
AccessToken string `json:"access_token" example:"system_01##20200102030405##a6ce11eab94df48a6ce11eab" `
DataSource string `json:"data_source" example:"log_login" `
AccessIP string `json:"access_ip" example:"10.10.8.88"`
AccessWay string `json:"access_way" example:"app/web/weixin/dingding"`
UserData User `json:"user_data" `
EventDatas []Event `json:"event_datas" `
}

@ -6,5 +6,5 @@ type DataPage struct {
DataSource string `json:"data_source" example:"ORG"`
OrgID string `json:"org_id" example:"200201"`
QueryTime string `json:"query_time" example:"2020-01-01 02:03:04"`
QueryPage int `json:"query_page" example:"1"`
QueryPage int `json:"query_page" example:"0"`
}

@ -0,0 +1,13 @@
package MySwagger
type Event struct {
EventType string `json:"event_type" example:"1001"`
EventName string `json:"event_name" example:"在线提交作业"`
EventTime string `json:"event_time" example:"2020-08-08 18:19:20"`
EventURI string `json:"event_uri" example:"https://doc.thinkingdata.cn/user_guide"`
EventSeqNO string `json:"event_seqno" example:"事件顺序号1/2/3"`
EventUserID string `json:"event_userid" example:"用户ID(可选)"`
Properties string `json:"properties" example:"{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}"`
}

@ -0,0 +1,10 @@
package MySwagger
type User struct {
UserID string `json:"user_id" example:"用户ID"`
Identity string `json:"identity" example:"用户身份1教师2学生3家长4管理员5访客"`
AccessID string `json:"access_id" example:"访客ID可选"`
Properties string `json:"properties" example:"{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}"`
}

@ -43,7 +43,12 @@ var (
ProjectPath string
//kafka地址
KafkaAddress string
//KafkaAddress string
//Kafka broker地址
KafkaBrokers []string
KafkaReply int64
//ES Nodes 地址
ESNodes string
@ -54,6 +59,8 @@ var (
ESPWD string
//Params 系统参数
ROOT_ORGNAME string
)
@ -110,8 +117,12 @@ func init() {
ProjectPort = iniParser.GetString("project", "project_port")
ProjectGrpc = iniParser.GetString("project", "project_grpc")
//change by zhangjun 2020-07-24
//kafka地址
KafkaAddress = iniParser.GetString("kafka", "KafkaAddress")
//KafkaAddress = iniParser.GetString("kafka", "KafkaAddress")
KafkaBrokers=strings.Split(iniParser.GetString("kafka", "brokers"),",")
KafkaReply =iniParser.GetInt64("kafka", "replication")
ESNodes = iniParser.GetString("elasticsearch", "nodes")
ESUser = iniParser.GetString("elasticsearch", "user")

@ -3,22 +3,31 @@ package KafkaUtil
import (
"bytes"
"context"
"dsDataex/Utils/ConfigUtil"
"fmt"
"github.com/segmentio/kafka-go"
"strconv"
"time"
)
var KafkaClient *kafka.Conn
var KafkaBroker string
var CTX context.Context
func init() {
CTX=context.Background()
KafkaClient, _ = kafka.DialLeader(CTX , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0)
brokers,_:= KafkaClient.Brokers()
KafkaBroker=brokers[0].Host+":"+strconv.Itoa(brokers[0].Port)+"【"+strconv.Itoa(brokers[0].ID)+"】"
}
func ProvideLow(topic string) {
KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", "192.168.0.200:9092", topic , 0)
KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
KafkaClient.SetWriteDeadline(time.Now().Add(5 *time.Second))
@ -31,7 +40,10 @@ func ProvideLow(topic string) {
//KafkaClient.Close()
}
func ConsumeLow() {
func ConsumeLow(topic string) {
KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
KafkaClient.SetReadDeadline(time.Now().Add(5 *time.Second))
batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
@ -53,13 +65,13 @@ func ConsumeLow() {
//KafkaClient.Close()
}
func Consume() {
func Consume(topic string,group string) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"192.168.0.200:9092"},
Topic: "log_login",
Brokers: ConfigUtil.KafkaBrokers ,
Topic: topic,
//Partition: 0,
GroupID: "Group_04",//必须指定 Group否则需要指定 Partition
GroupID: group,//必须指定 Group否则需要指定 Partition
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
@ -77,11 +89,11 @@ func Consume() {
r.Close()
}
func Provide(){
func Provide(topic string){
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"192.168.0.200:9092"},
Topic: "log_login",
Brokers: ConfigUtil.KafkaBrokers,
Topic: topic,
Balancer: &kafka.LeastBytes{},//Partition 自动分配器
})
@ -103,11 +115,11 @@ func Provide(){
func CreateTopic(topic string){
KafkaClient, _ = kafka.DialLeader(CTX , "tcp", "10.10.14.238:9092", "__consumer_offsets" , 0)
KafkaClient, _ = kafka.DialLeader(CTX , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0)
err:= KafkaClient.CreateTopics(kafka.TopicConfig{
NumPartitions: 8,
ReplicationFactor: 1,
ReplicationFactor: int(ConfigUtil.KafkaReply),
Topic: topic,
})
@ -117,7 +129,7 @@ func CreateTopic(topic string){
}
func DeleteTopic(topic string){
KafkaClient, _ = kafka.DialLeader( CTX, "tcp", "10.10.14.238:9092", topic , 0)
KafkaClient, _ = kafka.DialLeader( CTX, "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
err:= KafkaClient.DeleteTopics(topic)

@ -35,7 +35,7 @@ var doc = `{
"paths": {
"/v1/dataex/DataexCollect": {
"post": {
"description": "【数据交换平台】数据汇集服务接口支持批量入库一个批次少于100条数据。",
"description": "【数据交换平台】用户行为数据汇集服务接口支持用户事件批量入库一个批次少于100个事件数据。",
"consumes": [
"application/json"
],
@ -53,7 +53,7 @@ var doc = `{
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/MySwagger.DataIn"
"$ref": "#/definitions/MySwagger.DataCollect"
}
}
],
@ -1447,6 +1447,36 @@ var doc = `{
}
}
},
"MySwagger.DataCollect": {
"type": "object",
"properties": {
"access_ip": {
"type": "string",
"example": "10.10.8.88"
},
"access_token": {
"type": "string",
"example": "system_01##20200102030405##a6ce11eab94df48a6ce11eab"
},
"access_way": {
"type": "string",
"example": "app/web/weixin/dingding"
},
"data_source": {
"type": "string",
"example": "log_login"
},
"event_datas": {
"type": "array",
"items": {
"$ref": "#/definitions/MySwagger.Event"
}
},
"user_data": {
"$ref": "#/definitions/MySwagger.User"
}
}
},
"MySwagger.DataGet": {
"type": "object",
"properties": {
@ -1561,7 +1591,7 @@ var doc = `{
},
"query_page": {
"type": "integer",
"example": 1
"example": 0
},
"query_time": {
"type": "string",
@ -1711,6 +1741,39 @@ var doc = `{
}
}
},
"MySwagger.Event": {
"type": "object",
"properties": {
"event_name": {
"type": "string",
"example": "在线提交作业"
},
"event_seqno": {
"type": "string",
"example": "事件顺序号1/2/3"
},
"event_time": {
"type": "string",
"example": "2020-08-08 18:19:20"
},
"event_type": {
"type": "string",
"example": "1001"
},
"event_uri": {
"type": "string",
"example": "https://doc.thinkingdata.cn/user_guide"
},
"event_userid": {
"type": "string",
"example": "用户ID(可选)"
},
"properties": {
"type": "string",
"example": "{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}"
}
}
},
"MySwagger.FailResult": {
"type": "object",
"properties": {
@ -2068,6 +2131,27 @@ var doc = `{
}
}
},
"MySwagger.User": {
"type": "object",
"properties": {
"access_id": {
"type": "string",
"example": "访客ID可选"
},
"identity": {
"type": "string",
"example": "用户身份1教师2学生3家长4管理员5访客"
},
"properties": {
"type": "string",
"example": "{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}"
},
"user_id": {
"type": "string",
"example": "用户ID"
}
}
},
"dsDataex_MyModel_MySwagger.Result": {
"type": "object",
"properties": {

@ -20,7 +20,7 @@
"paths": {
"/v1/dataex/DataexCollect": {
"post": {
"description": "【数据交换平台】数据汇集服务接口支持批量入库一个批次少于100条数据。",
"description": "【数据交换平台】用户行为数据汇集服务接口支持用户事件批量入库一个批次少于100个事件数据。",
"consumes": [
"application/json"
],
@ -38,7 +38,7 @@
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/MySwagger.DataIn"
"$ref": "#/definitions/MySwagger.DataCollect"
}
}
],
@ -1432,6 +1432,36 @@
}
}
},
"MySwagger.DataCollect": {
"type": "object",
"properties": {
"access_ip": {
"type": "string",
"example": "10.10.8.88"
},
"access_token": {
"type": "string",
"example": "system_01##20200102030405##a6ce11eab94df48a6ce11eab"
},
"access_way": {
"type": "string",
"example": "app/web/weixin/dingding"
},
"data_source": {
"type": "string",
"example": "log_login"
},
"event_datas": {
"type": "array",
"items": {
"$ref": "#/definitions/MySwagger.Event"
}
},
"user_data": {
"$ref": "#/definitions/MySwagger.User"
}
}
},
"MySwagger.DataGet": {
"type": "object",
"properties": {
@ -1546,7 +1576,7 @@
},
"query_page": {
"type": "integer",
"example": 1
"example": 0
},
"query_time": {
"type": "string",
@ -1696,6 +1726,39 @@
}
}
},
"MySwagger.Event": {
"type": "object",
"properties": {
"event_name": {
"type": "string",
"example": "在线提交作业"
},
"event_seqno": {
"type": "string",
"example": "事件顺序号1/2/3"
},
"event_time": {
"type": "string",
"example": "2020-08-08 18:19:20"
},
"event_type": {
"type": "string",
"example": "1001"
},
"event_uri": {
"type": "string",
"example": "https://doc.thinkingdata.cn/user_guide"
},
"event_userid": {
"type": "string",
"example": "用户ID(可选)"
},
"properties": {
"type": "string",
"example": "{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}"
}
}
},
"MySwagger.FailResult": {
"type": "object",
"properties": {
@ -2053,6 +2116,27 @@
}
}
},
"MySwagger.User": {
"type": "object",
"properties": {
"access_id": {
"type": "string",
"example": "访客ID可选"
},
"identity": {
"type": "string",
"example": "用户身份1教师2学生3家长4管理员5访客"
},
"properties": {
"type": "string",
"example": "{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}"
},
"user_id": {
"type": "string",
"example": "用户ID"
}
}
},
"dsDataex_MyModel_MySwagger.Result": {
"type": "object",
"properties": {

@ -27,6 +27,27 @@ definitions:
example: "200201"
type: string
type: object
MySwagger.DataCollect:
properties:
access_ip:
example: 10.10.8.88
type: string
access_token:
example: system_01##20200102030405##a6ce11eab94df48a6ce11eab
type: string
access_way:
example: app/web/weixin/dingding
type: string
data_source:
example: log_login
type: string
event_datas:
items:
$ref: '#/definitions/MySwagger.Event'
type: array
user_data:
$ref: '#/definitions/MySwagger.User'
type: object
MySwagger.DataGet:
properties:
auth_token:
@ -107,7 +128,7 @@ definitions:
example: "200201"
type: string
query_page:
example: 1
example: 0
type: integer
query_time:
example: "2020-01-01 02:03:04"
@ -210,6 +231,30 @@ definitions:
system_id:
type: string
type: object
MySwagger.Event:
properties:
event_name:
example: 在线提交作业
type: string
event_seqno:
example: 事件顺序号1/2/3
type: string
event_time:
example: "2020-08-08 18:19:20"
type: string
event_type:
example: "1001"
type: string
event_uri:
example: https://doc.thinkingdata.cn/user_guide
type: string
event_userid:
example: 用户ID(可选)
type: string
properties:
example: '{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}'
type: string
type: object
MySwagger.FailResult:
properties:
fail_id:
@ -452,6 +497,22 @@ definitions:
type: string
type: array
type: object
MySwagger.User:
properties:
access_id:
example: 访客ID可选
type: string
identity:
example: 用户身份1教师2学生3家长4管理员5访客
type: string
properties:
example: '{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08
12:13:14}'
type: string
user_id:
example: 用户ID
type: string
type: object
dsDataex_MyModel_MySwagger.Result:
properties:
data:
@ -496,14 +557,14 @@ paths:
post:
consumes:
- application/json
description: 【数据交换平台】数据汇集服务接口支持批量入库一个批次少于100条数据。
description: 【数据交换平台】用户行为数据汇集服务接口支持用户事件批量入库一个批次少于100个事件数据。
parameters:
- description: 汇集数据
in: body
name: input
required: true
schema:
$ref: '#/definitions/MySwagger.DataIn'
$ref: '#/definitions/MySwagger.DataCollect'
produces:
- application/json
responses:

@ -15,8 +15,8 @@ import (
"dsDataex/Utils"
"dsDataex/Utils/CacheUtil"
"dsDataex/Utils/ConfigUtil"
"dsDataex/Utils/ES7SqlUtil"
"dsDataex/Utils/ES7Util"
"dsDataex/Utils/KafkaUtil"
_ "dsDataex/docs"
"fmt"
"github.com/gin-gonic/gin"
@ -111,16 +111,14 @@ func main() {
fmt.Println("Dsideal DataEX GO GO GO !!!")
fmt.Println("ES Server :" + ES7Util.ServerVersion)
fmt.Println("ES Server :" + ES7SqlUtil.ServerVersion)
CacheUtil.OrgtreeCacheInit()
//KafkaUtil.Provide()
//KafkaUtil.Consume()
//fmt.Println("ES Server :" + ES7SqlUtil.ServerVersion)
fmt.Println("Kafka Server :" + KafkaUtil.KafkaBroker )
//KafkaUtil.CreateTopic("log_12345")
//KafkaUtil.DeleteTopic("log_12345")
CacheUtil.OrgtreeCacheInit()
GinServerInit()
g.Go(func() error {

Loading…
Cancel
Save