kafka cluster

master
zhangjun 5 years ago
parent fba49e92fb
commit 5168e36f5a

@ -43660,3 +43660,13 @@
[Error]2020/07/16 14:26:41 CreateRedisError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/07/16 15:27:36 RedisError redis: nil
[Error]2020/07/16 15:29:35 RedisError redis: nil
[Error]2020/08/06 11:23:46 CreateRedisError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:24:05 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:24:07 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:24:09 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:24:27 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:24:29 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:24:31 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:24:57 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:24:59 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:25:01 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.

@ -23,7 +23,7 @@ expireTime = 86400
;expireTime = 86400
[kafka]
brokers = 10.10.14.238:9092,
brokers = 10.10.14.238:9092,10.10.14.237:9092
;brokers = 192.168.0.200:9092,192.168.0.200:9091
KafkaAccessLogTopic = log_dataex
partition = 20

@ -211,7 +211,9 @@ func Orgtree2Redis(list []map[string]interface{}, cacheName string) {
ids=ids+"##"+bean["id"].(string)
}
RedisUtil.RedisClient.Set(cacheName, ids,10*time.Hour)
//change by zhangjun 2020-08-06
//RedisUtil.RedisClient.Set(cacheName, ids,10*time.Hour)
RedisUtil.RedisClient.Set(cacheName, ids,0 )
}
func Orgtree2Redis2(list []map[string]interface{}, cacheName string) {

@ -11,6 +11,7 @@ import (
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/snappy"
"math/rand"
"net"
"strconv"
"sync"
"time"
@ -35,7 +36,12 @@ func init() {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
brokers, _ := KafkaClient.Brokers()
KafkaBroker = brokers[0].Host + ":" + strconv.Itoa(brokers[0].Port) + "【" + strconv.Itoa(brokers[0].ID) + "】"
//change by zhangjun 2020-08-14
for no:=0;no< len(brokers);no++{
KafkaBroker += brokers[no].Host + ":" + strconv.Itoa(brokers[no].Port) + "【" + strconv.Itoa(brokers[no].ID) + "】,"
}
//KafkaBroker = brokers[0].Host + ":" + strconv.Itoa(brokers[0].Port) + "【" + strconv.Itoa(brokers[0].ID) + "】"
writerPool = make(map[string]*kafka.Writer)
kafkaPool = make(map[string]map[int]*kafka.Conn)
@ -67,8 +73,10 @@ func ProvideLow(topic string, datas []DataEX.KafkaData) (bool, string) {
if len(parts) == 1 && offset == 0 { //初始化 Topic
DeleteTopic(topic)
//change by zhangjun 2020-08-13 Kafka Cluster
time.Sleep(100 * time.Millisecond)
CreateTopic(topic)
//CreateTopic(topic)
CreateTopic2(topic)
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0)
@ -361,6 +369,32 @@ func CreateTopic(topic string) {
}
}
/**
* @Author zhangjun
* @Description Kafka Cluster
* @Date 2020-08-14 08:09
* @Param
* @return
**/
func CreateTopic2(topic string) {
KafkaClient, _ := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], "__consumer_offsets", 0)
control,_:= KafkaClient.Controller()
conn,_:=net.Dial("tcp",control.Host+":"+strconv.Itoa(control.Port))
client:= kafka.NewConn(conn ,"__consumer_offsets",0)
err := client.CreateTopics(kafka.TopicConfig{
NumPartitions: int(ConfigUtil.KafkaParts),
ReplicationFactor: int(ConfigUtil.KafkaReply),
Topic: topic,
})
if err != nil {
fmt.Println(err.Error())
}
}
/**
* @Author zhangjun
* @Description Kafka Topic

File diff suppressed because it is too large Load Diff

@ -118,6 +118,7 @@ func main() {
//fmt.Println("ES Server :" + ES7SqlUtil.ServerVersion)
fmt.Println("Kafka Server :" + KafkaUtil.KafkaBroker )
//KafkaUtil.CreateTopic2("log_test")
//var token=MD5Util.MD5V1("TEST_007" + "20200707" + "DSDataex_Token_7ee1f0f76243449f8d75f40fdcc2b93d")
//fmt.Println("AccessToken : "+token)
@ -126,7 +127,7 @@ func main() {
GinServerInit()
g.Go(func() error {
fmt.Printf("dsDataEX Gin服务启动成功,服务端口: %s\n", ConfigUtil.ProjectPort)
fmt.Printf("dsDataEX Gin服务启动成功,服务地址: http://127.0.0.1:%s/docs \n", ConfigUtil.ProjectPort)
return r.Run(":" + ConfigUtil.ProjectPort)
})
@ -136,7 +137,7 @@ func main() {
g.Go(func() error {
lis, _ := net.Listen("tcp", ":"+ConfigUtil.ProjectGrpc)
fmt.Printf("dsDataEX GRPC服务发布成功,服务端口: %s\n", ConfigUtil.ProjectGrpc)
fmt.Printf("dsDataEX GRPC服务发布成功,服务地址: %s\n", ConfigUtil.ProjectGrpc)
return s.Serve(lis)
})

Loading…
Cancel
Save