Merge branch 'master' of 10.10.14.250:huanghai/dsMin

master
huanghai 5 years ago
commit f81b5eb532

@ -25,7 +25,9 @@ expireTime = 86400
[kafka] [kafka]
brokers = 192.168.0.200:9092,192.168.0.200:9091 brokers = 10.10.14.238:9092,
;brokers = 192.168.0.200:9092,192.168.0.200:9091
replication = 1

@ -73,24 +73,24 @@ func DataexSet(c *gin.Context) {
} }
// @Summary 汇集数据 // @Summary 汇集数据
// @Description 【数据交换平台】数据汇集服务接口支持批量入库一个批次少于100条数据。 // @Description 【数据交换平台】用户行为数据汇集服务接口支持用户事件批量入库一个批次少于100个事件数据。
// @Tags dataex // @Tags dataex
// @Accept json // @Accept json
// @Produce json // @Produce json
// @Param input body MySwagger.DataIn true "汇集数据" // @Param input body MySwagger.DataCollect true "汇集数据"
// @Success 200 {object} MySwagger.Result // @Success 200 {object} MySwagger.Result
// @Failure 400 {object} MySwagger.Result // @Failure 400 {object} MySwagger.Result
// @Router /v1/dataex/DataexCollect [post] // @Router /v1/dataex/DataexCollect [post]
func DataexCollect(c *gin.Context) { func DataexCollect(c *gin.Context) {
var input MySwagger.DataIn var input MySwagger.DataCollect
if err := c.ShouldBindJSON(&input); err != nil { if err := c.ShouldBindJSON(&input); err != nil {
c.JSON(http.StatusBadRequest, MySwagger.Result{Success: false,Message: "接入系统数据JSON格式错误"}) c.JSON(http.StatusBadRequest, MySwagger.Result{Success: false,Message: "接入系统数据JSON格式错误"})
return return
} }
if input.SystemID == "" || input.AuthToken == "" { if input.AccessToken == "" {
c.JSON(http.StatusUnauthorized, MySwagger.Result{Success: false,Message: "接入系统票据验证失败"}) c.JSON(http.StatusUnauthorized, MySwagger.Result{Success: false,Message: "接入系统票据验证失败"})
return return
} }

@ -0,0 +1,12 @@
package MySwagger
type DataCollect struct {
AccessToken string `json:"access_token" example:"system_01##20200102030405##a6ce11eab94df48a6ce11eab" `
DataSource string `json:"data_source" example:"log_login" `
AccessIP string `json:"access_ip" example:"10.10.8.88"`
AccessWay string `json:"access_way" example:"app/web/weixin/dingding"`
UserData User `json:"user_data" `
EventDatas []Event `json:"event_datas" `
}

@ -6,5 +6,5 @@ type DataPage struct {
DataSource string `json:"data_source" example:"ORG"` DataSource string `json:"data_source" example:"ORG"`
OrgID string `json:"org_id" example:"200201"` OrgID string `json:"org_id" example:"200201"`
QueryTime string `json:"query_time" example:"2020-01-01 02:03:04"` QueryTime string `json:"query_time" example:"2020-01-01 02:03:04"`
QueryPage int `json:"query_page" example:"1"` QueryPage int `json:"query_page" example:"0"`
} }

@ -0,0 +1,13 @@
package MySwagger
type Event struct {
EventType string `json:"event_type" example:"1001"`
EventName string `json:"event_name" example:"在线提交作业"`
EventTime string `json:"event_time" example:"2020-08-08 18:19:20"`
EventURI string `json:"event_uri" example:"https://doc.thinkingdata.cn/user_guide"`
EventSeqNO string `json:"event_seqno" example:"事件顺序号1/2/3"`
EventUserID string `json:"event_userid" example:"用户ID(可选)"`
Properties string `json:"properties" example:"{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}"`
}

@ -0,0 +1,10 @@
package MySwagger
type User struct {
UserID string `json:"user_id" example:"用户ID"`
Identity string `json:"identity" example:"用户身份1教师2学生3家长4管理员5访客"`
AccessID string `json:"access_id" example:"访客ID可选"`
Properties string `json:"properties" example:"{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}"`
}

@ -43,7 +43,12 @@ var (
ProjectPath string ProjectPath string
//kafka地址 //kafka地址
KafkaAddress string //KafkaAddress string
//Kafka broker地址
KafkaBrokers []string
KafkaReply int64
//ES Nodes 地址 //ES Nodes 地址
ESNodes string ESNodes string
@ -54,6 +59,8 @@ var (
ESPWD string ESPWD string
//Params 系统参数 //Params 系统参数
ROOT_ORGNAME string ROOT_ORGNAME string
) )
@ -110,8 +117,12 @@ func init() {
ProjectPort = iniParser.GetString("project", "project_port") ProjectPort = iniParser.GetString("project", "project_port")
ProjectGrpc = iniParser.GetString("project", "project_grpc") ProjectGrpc = iniParser.GetString("project", "project_grpc")
//change by zhangjun 2020-07-24
//kafka地址 //kafka地址
KafkaAddress = iniParser.GetString("kafka", "KafkaAddress") //KafkaAddress = iniParser.GetString("kafka", "KafkaAddress")
KafkaBrokers=strings.Split(iniParser.GetString("kafka", "brokers"),",")
KafkaReply =iniParser.GetInt64("kafka", "replication")
ESNodes = iniParser.GetString("elasticsearch", "nodes") ESNodes = iniParser.GetString("elasticsearch", "nodes")
ESUser = iniParser.GetString("elasticsearch", "user") ESUser = iniParser.GetString("elasticsearch", "user")

@ -3,22 +3,31 @@ package KafkaUtil
import ( import (
"bytes" "bytes"
"context" "context"
"dsDataex/Utils/ConfigUtil"
"fmt" "fmt"
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
"strconv"
"time" "time"
) )
var KafkaClient *kafka.Conn var KafkaClient *kafka.Conn
var KafkaBroker string
var CTX context.Context var CTX context.Context
func init() { func init() {
CTX=context.Background() CTX=context.Background()
KafkaClient, _ = kafka.DialLeader(CTX , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0)
brokers,_:= KafkaClient.Brokers()
KafkaBroker=brokers[0].Host+":"+strconv.Itoa(brokers[0].Port)+"【"+strconv.Itoa(brokers[0].ID)+"】"
} }
func ProvideLow(topic string) { func ProvideLow(topic string) {
KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", "192.168.0.200:9092", topic , 0)
KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
KafkaClient.SetWriteDeadline(time.Now().Add(5 *time.Second)) KafkaClient.SetWriteDeadline(time.Now().Add(5 *time.Second))
@ -31,7 +40,10 @@ func ProvideLow(topic string) {
//KafkaClient.Close() //KafkaClient.Close()
} }
func ConsumeLow() { func ConsumeLow(topic string) {
KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
KafkaClient.SetReadDeadline(time.Now().Add(5 *time.Second)) KafkaClient.SetReadDeadline(time.Now().Add(5 *time.Second))
batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
@ -53,13 +65,13 @@ func ConsumeLow() {
//KafkaClient.Close() //KafkaClient.Close()
} }
func Consume() { func Consume(topic string,group string) {
r := kafka.NewReader(kafka.ReaderConfig{ r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"192.168.0.200:9092"}, Brokers: ConfigUtil.KafkaBrokers ,
Topic: "log_login", Topic: topic,
//Partition: 0, //Partition: 0,
GroupID: "Group_04",//必须指定 Group否则需要指定 Partition GroupID: group,//必须指定 Group否则需要指定 Partition
MinBytes: 10e3, // 10KB MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB MaxBytes: 10e6, // 10MB
}) })
@ -77,11 +89,11 @@ func Consume() {
r.Close() r.Close()
} }
func Provide(){ func Provide(topic string){
w := kafka.NewWriter(kafka.WriterConfig{ w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"192.168.0.200:9092"}, Brokers: ConfigUtil.KafkaBrokers,
Topic: "log_login", Topic: topic,
Balancer: &kafka.LeastBytes{},//Partition 自动分配器 Balancer: &kafka.LeastBytes{},//Partition 自动分配器
}) })
@ -103,11 +115,11 @@ func Provide(){
func CreateTopic(topic string){ func CreateTopic(topic string){
KafkaClient, _ = kafka.DialLeader(CTX , "tcp", "10.10.14.238:9092", "__consumer_offsets" , 0) KafkaClient, _ = kafka.DialLeader(CTX , "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets" , 0)
err:= KafkaClient.CreateTopics(kafka.TopicConfig{ err:= KafkaClient.CreateTopics(kafka.TopicConfig{
NumPartitions: 8, NumPartitions: 8,
ReplicationFactor: 1, ReplicationFactor: int(ConfigUtil.KafkaReply),
Topic: topic, Topic: topic,
}) })
@ -117,7 +129,7 @@ func CreateTopic(topic string){
} }
func DeleteTopic(topic string){ func DeleteTopic(topic string){
KafkaClient, _ = kafka.DialLeader( CTX, "tcp", "10.10.14.238:9092", topic , 0) KafkaClient, _ = kafka.DialLeader( CTX, "tcp", ConfigUtil.KafkaBrokers[0], topic , 0)
err:= KafkaClient.DeleteTopics(topic) err:= KafkaClient.DeleteTopics(topic)

@ -35,7 +35,7 @@ var doc = `{
"paths": { "paths": {
"/v1/dataex/DataexCollect": { "/v1/dataex/DataexCollect": {
"post": { "post": {
"description": "【数据交换平台】数据汇集服务接口支持批量入库一个批次少于100条数据。", "description": "【数据交换平台】用户行为数据汇集服务接口支持用户事件批量入库一个批次少于100个事件数据。",
"consumes": [ "consumes": [
"application/json" "application/json"
], ],
@ -53,7 +53,7 @@ var doc = `{
"in": "body", "in": "body",
"required": true, "required": true,
"schema": { "schema": {
"$ref": "#/definitions/MySwagger.DataIn" "$ref": "#/definitions/MySwagger.DataCollect"
} }
} }
], ],
@ -1447,6 +1447,36 @@ var doc = `{
} }
} }
}, },
"MySwagger.DataCollect": {
"type": "object",
"properties": {
"access_ip": {
"type": "string",
"example": "10.10.8.88"
},
"access_token": {
"type": "string",
"example": "system_01##20200102030405##a6ce11eab94df48a6ce11eab"
},
"access_way": {
"type": "string",
"example": "app/web/weixin/dingding"
},
"data_source": {
"type": "string",
"example": "log_login"
},
"event_datas": {
"type": "array",
"items": {
"$ref": "#/definitions/MySwagger.Event"
}
},
"user_data": {
"$ref": "#/definitions/MySwagger.User"
}
}
},
"MySwagger.DataGet": { "MySwagger.DataGet": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -1561,7 +1591,7 @@ var doc = `{
}, },
"query_page": { "query_page": {
"type": "integer", "type": "integer",
"example": 1 "example": 0
}, },
"query_time": { "query_time": {
"type": "string", "type": "string",
@ -1711,6 +1741,39 @@ var doc = `{
} }
} }
}, },
"MySwagger.Event": {
"type": "object",
"properties": {
"event_name": {
"type": "string",
"example": "在线提交作业"
},
"event_seqno": {
"type": "string",
"example": "事件顺序号1/2/3"
},
"event_time": {
"type": "string",
"example": "2020-08-08 18:19:20"
},
"event_type": {
"type": "string",
"example": "1001"
},
"event_uri": {
"type": "string",
"example": "https://doc.thinkingdata.cn/user_guide"
},
"event_userid": {
"type": "string",
"example": "用户ID(可选)"
},
"properties": {
"type": "string",
"example": "{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}"
}
}
},
"MySwagger.FailResult": { "MySwagger.FailResult": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -2068,6 +2131,27 @@ var doc = `{
} }
} }
}, },
"MySwagger.User": {
"type": "object",
"properties": {
"access_id": {
"type": "string",
"example": "访客ID可选"
},
"identity": {
"type": "string",
"example": "用户身份1教师2学生3家长4管理员5访客"
},
"properties": {
"type": "string",
"example": "{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}"
},
"user_id": {
"type": "string",
"example": "用户ID"
}
}
},
"dsDataex_MyModel_MySwagger.Result": { "dsDataex_MyModel_MySwagger.Result": {
"type": "object", "type": "object",
"properties": { "properties": {

@ -20,7 +20,7 @@
"paths": { "paths": {
"/v1/dataex/DataexCollect": { "/v1/dataex/DataexCollect": {
"post": { "post": {
"description": "【数据交换平台】数据汇集服务接口支持批量入库一个批次少于100条数据。", "description": "【数据交换平台】用户行为数据汇集服务接口支持用户事件批量入库一个批次少于100个事件数据。",
"consumes": [ "consumes": [
"application/json" "application/json"
], ],
@ -38,7 +38,7 @@
"in": "body", "in": "body",
"required": true, "required": true,
"schema": { "schema": {
"$ref": "#/definitions/MySwagger.DataIn" "$ref": "#/definitions/MySwagger.DataCollect"
} }
} }
], ],
@ -1432,6 +1432,36 @@
} }
} }
}, },
"MySwagger.DataCollect": {
"type": "object",
"properties": {
"access_ip": {
"type": "string",
"example": "10.10.8.88"
},
"access_token": {
"type": "string",
"example": "system_01##20200102030405##a6ce11eab94df48a6ce11eab"
},
"access_way": {
"type": "string",
"example": "app/web/weixin/dingding"
},
"data_source": {
"type": "string",
"example": "log_login"
},
"event_datas": {
"type": "array",
"items": {
"$ref": "#/definitions/MySwagger.Event"
}
},
"user_data": {
"$ref": "#/definitions/MySwagger.User"
}
}
},
"MySwagger.DataGet": { "MySwagger.DataGet": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -1546,7 +1576,7 @@
}, },
"query_page": { "query_page": {
"type": "integer", "type": "integer",
"example": 1 "example": 0
}, },
"query_time": { "query_time": {
"type": "string", "type": "string",
@ -1696,6 +1726,39 @@
} }
} }
}, },
"MySwagger.Event": {
"type": "object",
"properties": {
"event_name": {
"type": "string",
"example": "在线提交作业"
},
"event_seqno": {
"type": "string",
"example": "事件顺序号1/2/3"
},
"event_time": {
"type": "string",
"example": "2020-08-08 18:19:20"
},
"event_type": {
"type": "string",
"example": "1001"
},
"event_uri": {
"type": "string",
"example": "https://doc.thinkingdata.cn/user_guide"
},
"event_userid": {
"type": "string",
"example": "用户ID(可选)"
},
"properties": {
"type": "string",
"example": "{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}"
}
}
},
"MySwagger.FailResult": { "MySwagger.FailResult": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -2053,6 +2116,27 @@
} }
} }
}, },
"MySwagger.User": {
"type": "object",
"properties": {
"access_id": {
"type": "string",
"example": "访客ID可选"
},
"identity": {
"type": "string",
"example": "用户身份1教师2学生3家长4管理员5访客"
},
"properties": {
"type": "string",
"example": "{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08 12:13:14}"
},
"user_id": {
"type": "string",
"example": "用户ID"
}
}
},
"dsDataex_MyModel_MySwagger.Result": { "dsDataex_MyModel_MySwagger.Result": {
"type": "object", "type": "object",
"properties": { "properties": {

@ -27,6 +27,27 @@ definitions:
example: "200201" example: "200201"
type: string type: string
type: object type: object
MySwagger.DataCollect:
properties:
access_ip:
example: 10.10.8.88
type: string
access_token:
example: system_01##20200102030405##a6ce11eab94df48a6ce11eab
type: string
access_way:
example: app/web/weixin/dingding
type: string
data_source:
example: log_login
type: string
event_datas:
items:
$ref: '#/definitions/MySwagger.Event'
type: array
user_data:
$ref: '#/definitions/MySwagger.User'
type: object
MySwagger.DataGet: MySwagger.DataGet:
properties: properties:
auth_token: auth_token:
@ -107,7 +128,7 @@ definitions:
example: "200201" example: "200201"
type: string type: string
query_page: query_page:
example: 1 example: 0
type: integer type: integer
query_time: query_time:
example: "2020-01-01 02:03:04" example: "2020-01-01 02:03:04"
@ -210,6 +231,30 @@ definitions:
system_id: system_id:
type: string type: string
type: object type: object
MySwagger.Event:
properties:
event_name:
example: 在线提交作业
type: string
event_seqno:
example: 事件顺序号1/2/3
type: string
event_time:
example: "2020-08-08 18:19:20"
type: string
event_type:
example: "1001"
type: string
event_uri:
example: https://doc.thinkingdata.cn/user_guide
type: string
event_userid:
example: 用户ID(可选)
type: string
properties:
example: '{res_id:12300,res_name:初一语文一单元作业,teacher_id:123001,time_spend:100s}'
type: string
type: object
MySwagger.FailResult: MySwagger.FailResult:
properties: properties:
fail_id: fail_id:
@ -452,6 +497,22 @@ definitions:
type: string type: string
type: array type: array
type: object type: object
MySwagger.User:
properties:
access_id:
example: 访客ID可选
type: string
identity:
example: 用户身份1教师2学生3家长4管理员5访客
type: string
properties:
example: '{group_id:12300,group_name:课外学习一组,user_level:2,login_time:2020/08/08
12:13:14}'
type: string
user_id:
example: 用户ID
type: string
type: object
dsDataex_MyModel_MySwagger.Result: dsDataex_MyModel_MySwagger.Result:
properties: properties:
data: data:
@ -496,14 +557,14 @@ paths:
post: post:
consumes: consumes:
- application/json - application/json
description: 【数据交换平台】数据汇集服务接口支持批量入库一个批次少于100条数据。 description: 【数据交换平台】用户行为数据汇集服务接口支持用户事件批量入库一个批次少于100个事件数据。
parameters: parameters:
- description: 汇集数据 - description: 汇集数据
in: body in: body
name: input name: input
required: true required: true
schema: schema:
$ref: '#/definitions/MySwagger.DataIn' $ref: '#/definitions/MySwagger.DataCollect'
produces: produces:
- application/json - application/json
responses: responses:

@ -15,8 +15,8 @@ import (
"dsDataex/Utils" "dsDataex/Utils"
"dsDataex/Utils/CacheUtil" "dsDataex/Utils/CacheUtil"
"dsDataex/Utils/ConfigUtil" "dsDataex/Utils/ConfigUtil"
"dsDataex/Utils/ES7SqlUtil"
"dsDataex/Utils/ES7Util" "dsDataex/Utils/ES7Util"
"dsDataex/Utils/KafkaUtil"
_ "dsDataex/docs" _ "dsDataex/docs"
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -111,16 +111,14 @@ func main() {
fmt.Println("Dsideal DataEX GO GO GO !!!") fmt.Println("Dsideal DataEX GO GO GO !!!")
fmt.Println("ES Server :" + ES7Util.ServerVersion) fmt.Println("ES Server :" + ES7Util.ServerVersion)
fmt.Println("ES Server :" + ES7SqlUtil.ServerVersion) //fmt.Println("ES Server :" + ES7SqlUtil.ServerVersion)
CacheUtil.OrgtreeCacheInit()
//KafkaUtil.Provide()
//KafkaUtil.Consume()
fmt.Println("Kafka Server :" + KafkaUtil.KafkaBroker )
//KafkaUtil.CreateTopic("log_12345") //KafkaUtil.CreateTopic("log_12345")
//KafkaUtil.DeleteTopic("log_12345") //KafkaUtil.DeleteTopic("log_12345")
CacheUtil.OrgtreeCacheInit()
GinServerInit() GinServerInit()
g.Go(func() error { g.Go(func() error {

Loading…
Cancel
Save