diff --git a/dsBaseWeb/Config/Config.ini b/dsBaseWeb/Config/Config.ini index 363d00a3..ac548b98 100644 --- a/dsBaseWeb/Config/Config.ini +++ b/dsBaseWeb/Config/Config.ini @@ -32,8 +32,10 @@ port = 8002 project_name = dsBaseWeb [kafka] -KafkaAddress = 10.10.6.202:9092 -KafkaAccessLogTopic = dsAccessLog +brokers = 10.10.14.238:9092, +partition = 20 +replication = 1 +KafkaAccessLogTopic = log_BaseWeb [sso] ssoServerNw = http://10.10.14.187 diff --git a/dsBaseWeb/Middleware/Middleware.go b/dsBaseWeb/Middleware/Middleware.go index 20e1c3bf..a83ef51a 100644 --- a/dsBaseWeb/Middleware/Middleware.go +++ b/dsBaseWeb/Middleware/Middleware.go @@ -1,10 +1,14 @@ package Middleware import ( + "dsBaseWeb/Utils/CommonUtil" "dsBaseWeb/Utils/ConfigUtil" + "dsBaseWeb/Utils/KafkaUtil" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "github.com/tracer0tong/kafkalogrus" + "os" + "strings" "time" ) @@ -18,21 +22,47 @@ func (u CSTFormatter) Format(e *logrus.Entry) ([]byte, error) { return u.Formatter.Format(e) } +//add by zhangjun 2020-08-03 +func stringEndWith(input string) bool { + + if strings.Contains(input,".png"){ return true } + if strings.Contains(input,".jpg"){ return true } + if strings.Contains(input,".js"){ return true } + if strings.Contains(input,".css"){ return true } + if strings.Contains(input,".ico"){ return true } + if strings.Contains(input,".svg"){ return true} + if strings.Contains(input,".gif"){ return true } + return false +} + func init() { var err error var hook *kafkalogrus.KafkaLogrusHook + + //add by zhangjun 2020-08-03 + //初始化 topic,多个 partition,提高数据批处理性能。 + KafkaUtil.InitTopic(ConfigUtil.KafkaAccessLogTopic) + if hook, err = kafkalogrus.NewKafkaLogrusHook( "kh", logrus.AllLevels, &logrus.JSONFormatter{}, - []string{ConfigUtil.KafkaAddress}, + //change by zhangjun 2020-08-03 + //[]string{ConfigUtil.KafkaAddress}, + ConfigUtil.KafkaBrokers, ConfigUtil.KafkaAccessLogTopic, - true, nil); err != nil { + true, + nil); err != nil { panic(err) } //设置日志格式 logrus.SetFormatter(CSTFormatter{&logrus.JSONFormatter{}}) + + //add by zhangjun 2020-08-03 + src, _:= os.OpenFile("./gin_server.log", os.O_APPEND|os.O_WRONLY, os.ModeAppend) + logrus.SetOutput(src) + logrus.AddHook(hook) } @@ -46,7 +76,7 @@ func LoggerToKafka() gin.HandlerFunc { // 结束时间 endTime := time.Now().Local() // 执行时间 - latencyTime := endTime.Sub(startTime) + latencyTime := endTime.Sub(startTime).Seconds() // 请求方式 reqMethod := c.Request.Method // 请求路由 @@ -76,23 +106,56 @@ func LoggerToKafka() gin.HandlerFunc { identityId = "-1" } - browserId, err := c.Cookie("browser_id") + //add by zhangjun 2020-08-03 + broswerId, err := c.Cookie("browser_id") if err != nil { - browserId = "-1" + broswerId = "-1" } + var properties=make(map[string]interface{}) + properties["req_method"]=reqMethod + properties["latency_time"]=latencyTime + properties["status_code"]=statusCode + + strs:=strings.Split(reqUri,"/") + properties["req_action"]=strs[len(strs)-1] + + //一般静态文件 + if stringEndWith( strs[len(strs)-1]){ + return + } + + //change by zhangjun 2020-08-03 // 日志格式 + //logrus.WithFields(logrus.Fields{ + // "status_code": statusCode, + // "access_time": startTime, + // "latency_time": latencyTime, + // "client_ip": clientIP, + // "req_method": reqMethod, + // "req_uri": reqUri, + // "device_id": deviceId, + // "identity_id": identityId, + // "person_id": personId, + //}).Info() logrus.WithFields(logrus.Fields{ - "status_code": statusCode, - "access_time": startTime, - "latency_time": latencyTime, - "client_ip": clientIP, - "req_method": reqMethod, - "req_uri": reqUri, - "device_id": deviceId, - "identity_id": identityId, - "person_id": personId, - "browser_id": browserId, + "system_id":ConfigUtil.ProjectName, + "datasource_id":ConfigUtil.KafkaAccessLogTopic, + "data_id":CommonUtil.GetUUID(), + + "user_id":personId, + "identity":identityId, + "access_id":broswerId, + "access_ip":clientIP, + "access_way":deviceId, + + "event_type":"101", + "event_name":"gin_webrequest", + "event_time":startTime.Format("2006/01/02 15:04:05"), + "event_uri":reqUri, + "event_seqno":0, + "collect_time":endTime.Format("2006/01/02 15:04:05"), + "event_property":properties, }).Info() } } diff --git a/dsBaseWeb/Utils/ConfigUtil/ConfigUtil.go b/dsBaseWeb/Utils/ConfigUtil/ConfigUtil.go index edea885f..462811eb 100644 --- a/dsBaseWeb/Utils/ConfigUtil/ConfigUtil.go +++ b/dsBaseWeb/Utils/ConfigUtil/ConfigUtil.go @@ -4,6 +4,7 @@ import ( "fmt" "gopkg.in/ini.v1" "os" + "strings" ) var ( @@ -36,10 +37,14 @@ var ( RedisExpireTime int64 //项目名称 ProjectName string + //kafka地址 - KafkaAddress string + //KafkaAddress string + KafkaBrokers []string //kafka web日志主题 KafkaAccessLogTopic string + KafkaParts int64 + KafkaReply int64 //Sso配置 SsoServerNw string @@ -121,10 +126,14 @@ func init() { RedisExpireTime = iniParser.GetInt64("redis", "expireTime") //项目名称 ProjectName = iniParser.GetString("project", "project_name") + //change by zhangjun 2020-08-03 //kafka地址 - KafkaAddress = iniParser.GetString("kafka", "KafkaAddress") + //KafkaAddress = iniParser.GetString("kafka", "KafkaAddress") + KafkaBrokers=strings.Split(iniParser.GetString("kafka", "brokers"),",") //kafka WEB日志主题 KafkaAccessLogTopic = iniParser.GetString("kafka", "KafkaAccessLogTopic") + KafkaReply =iniParser.GetInt64("kafka", "replication") + KafkaParts=iniParser.GetInt64("kafka", "partition") SsoServerNw = iniParser.GetString("sso", "ssoServerNw") SsoServerWw = iniParser.GetString("sso", "ssoServerWw") diff --git a/dsBaseWeb/Utils/KafkaUtil/KafkaUtil.go b/dsBaseWeb/Utils/KafkaUtil/KafkaUtil.go new file mode 100644 index 00000000..b416fe52 --- /dev/null +++ b/dsBaseWeb/Utils/KafkaUtil/KafkaUtil.go @@ -0,0 +1,87 @@ +package KafkaUtil + +import ( + "context" + "dsBaseWeb/Utils/ConfigUtil" + "fmt" + "github.com/segmentio/kafka-go" + "strconv" + "time" +) + +var KafkaBroker string + + + +func init() { + + KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0) + + brokers, _ := KafkaClient.Brokers() + KafkaBroker = brokers[0].Host + ":" + strconv.Itoa(brokers[0].Port) + "【" + strconv.Itoa(brokers[0].ID) + "】" + +} + + +/** + * @Author zhangjun + * @Description + * @Date 2020-08-03 08:47 + * @Param + * @return + **/ +func InitTopic(topic string) { + client, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0) + + parts, _ := client.ReadPartitions() + offset, _ := client.ReadLastOffset() + + if len(parts) == 1 && offset == 0 { + + DeleteTopic(topic) + + time.Sleep(100 * time.Millisecond) + + CreateTopic(topic) + } +} + +/** + * @Author zhangjun + * @Description 创建 Kafka Topic + * @Date 2020-07-29 02:30 + * @Param + * @return + **/ +func CreateTopic(topic string) { + + KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0) + + err := KafkaClient.CreateTopics(kafka.TopicConfig{ + NumPartitions: int(ConfigUtil.KafkaParts), + ReplicationFactor: int(ConfigUtil.KafkaReply), + Topic: topic, + }) + + if err != nil { + fmt.Println(err.Error()) + } +} + +/** + * @Author zhangjun + * @Description 删除Kafka Topic + * @Date 2020-07-29 02:30 + * @Param + * @return + **/ +func DeleteTopic(topic string) { + KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0) + + err := KafkaClient.DeleteTopics(topic) + + if err != nil { + fmt.Println(err.Error()) + } + +} diff --git a/dsBaseWeb/go.mod b/dsBaseWeb/go.mod index f8b3cd1b..a03b5fc0 100644 --- a/dsBaseWeb/go.mod +++ b/dsBaseWeb/go.mod @@ -39,6 +39,7 @@ require ( github.com/pkg/sftp v1.11.0 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect github.com/satori/go.uuid v1.2.0 + github.com/segmentio/kafka-go v0.3.7 github.com/sirupsen/logrus v1.5.0 github.com/smartystreets/assertions v1.1.0 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect diff --git a/dsBaseWeb/go.sum b/dsBaseWeb/go.sum index 6593aed0..77e35951 100644 --- a/dsBaseWeb/go.sum +++ b/dsBaseWeb/go.sum @@ -217,6 +217,7 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg= github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= @@ -238,6 +239,8 @@ github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/segmentio/kafka-go v0.3.7 h1:UCFPJw6KoVkmrilA2LbWVuybJojHzj6gDDFdV7H7IBs= +github.com/segmentio/kafka-go v0.3.7/go.mod h1:8rEphJEczp+yDE/R5vwmaqZgF1wllrl4ioQcNKB8wVA= github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q= @@ -313,6 +316,7 @@ github.com/xuri/efp v0.0.0-20191019043341-b7dc4fe9aa91 h1:gp02YctZuIPTk0t7qI+wvg github.com/xuri/efp v0.0.0-20191019043341-b7dc4fe9aa91/go.mod h1:uBiSUepVYMhGTfDeBKKasV4GpgBlzJ46gXUBAqV8qLk= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=