kafka cluster

master
zhangjun 5 years ago
parent b69243b87d
commit a936b249f7

@ -43670,3 +43670,25 @@
[Error]2020/08/06 11:24:57 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it. [Error]2020/08/06 11:24:57 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:24:59 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it. [Error]2020/08/06 11:24:59 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/06 11:25:01 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it. [Error]2020/08/06 11:25:01 SqlQueryError dial tcp 127.0.0.1:6379: connectex: No connection could be made because the target machine actively refused it.
[Error]2020/08/14 13:41:17 SqlQueryError read tcp 127.0.0.1:51290->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:20 SqlQueryError read tcp 127.0.0.1:51293->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:23 SqlQueryError read tcp 127.0.0.1:51296->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:38 SqlQueryError read tcp 127.0.0.1:51319->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:39 SqlQueryError read tcp 127.0.0.1:51320->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:41 SqlQueryError read tcp 127.0.0.1:51322->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:42 SqlQueryError read tcp 127.0.0.1:51323->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:44 SqlQueryError read tcp 127.0.0.1:51325->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:45 SqlQueryError read tcp 127.0.0.1:51326->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:50 SqlQueryError read tcp 127.0.0.1:51329->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:53 SqlQueryError read tcp 127.0.0.1:51330->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:41:56 SqlQueryError read tcp 127.0.0.1:51331->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:49:14 CreateRedisError read tcp 127.0.0.1:51390->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:49:41 SqlQueryError read tcp 127.0.0.1:51407->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:49:44 SqlQueryError read tcp 127.0.0.1:51408->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:49:47 SqlQueryError read tcp 127.0.0.1:51409->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:50:14 SqlQueryError read tcp 127.0.0.1:51422->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:50:17 SqlQueryError read tcp 127.0.0.1:51424->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:50:20 SqlQueryError read tcp 127.0.0.1:51425->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:51:00 SqlQueryError read tcp 127.0.0.1:51448->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:51:03 SqlQueryError read tcp 127.0.0.1:51450->127.0.0.1:6379: i/o timeout
[Error]2020/08/14 13:51:06 SqlQueryError read tcp 127.0.0.1:51453->127.0.0.1:6379: i/o timeout

@ -23,11 +23,11 @@ expireTime = 86400
;expireTime = 86400 ;expireTime = 86400
[kafka] [kafka]
brokers = 10.10.14.238:9092,10.10.14.237:9092 brokers = 10.10.14.237:9092,10.10.14.238:9092
;brokers = 192.168.0.200:9092,192.168.0.200:9091 ;brokers = 192.168.0.200:9092,192.168.0.200:9091
KafkaAccessLogTopic = log_dataex KafkaAccessLogTopic = log_dataex
partition = 20 partition = 20
replication = 1 replication = 2
process_no = 1 process_no = 1

@ -59,10 +59,12 @@ func ProvideLow(topic string, datas []DataEX.KafkaData) (bool, string) {
var client *kafka.Conn var client *kafka.Conn
clients, flag := kafkaPool[topic] clients, flag := kafkaPool[topic]
//add by zhangjun 2020-08-17
var err error
if flag == false { if flag == false {
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0) client, err = CreateConnection(topic,0)
clients = make(map[int]*kafka.Conn) clients = make(map[int]*kafka.Conn)
clients[0] = client clients[0] = client
@ -78,7 +80,7 @@ func ProvideLow(topic string, datas []DataEX.KafkaData) (bool, string) {
//CreateTopic(topic) //CreateTopic(topic)
CreateTopic2(topic) CreateTopic2(topic)
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, 0) client, _ = CreateConnection(topic,0)
clients = make(map[int]*kafka.Conn) clients = make(map[int]*kafka.Conn)
clients[0] = client clients[0] = client
@ -104,14 +106,17 @@ func ProvideLow(topic string, datas []DataEX.KafkaData) (bool, string) {
//add by zhangjun 2020-08-02 判断链接超时,异常关闭 //add by zhangjun 2020-08-02 判断链接超时,异常关闭
_, err := client.ReadLastOffset() _, err := client.ReadLastOffset()
if err != nil { if err != nil {
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num) //client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num)
client, _ = CreateConnection(topic,num)
kafkaPool[topic][num] = client kafkaPool[topic][num] = client
} }
} else { } else {
client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num) //client, _ = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[0], topic, num)
client, _ = CreateConnection(topic,num)
kafkaPool[topic][num] = client kafkaPool[topic][num] = client
} }
@ -132,7 +137,7 @@ func ProvideLow(topic string, datas []DataEX.KafkaData) (bool, string) {
client.SetWriteDeadline(time.Now().Add(10 * time.Second)) client.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err := client.WriteMessages(messages...) _, err = client.WriteMessages(messages...)
//client.Close() //client.Close()
@ -146,6 +151,31 @@ func ProvideLow(topic string, datas []DataEX.KafkaData) (bool, string) {
} }
} }
/**
* @Author zhangjun
* @Description Kafka
* @Date 2020-08-17 02:37
* @Param
* @return
**/
func CreateConnection(topic string,part int) (*kafka.Conn, error) {
//change by zhangjun 2020-08-17 for kafka cluster fail-over
index:=0
client, err := kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[index], topic, part)
if err != nil {
for no := 1; no < len(ConfigUtil.KafkaBrokers); no++ {
client, err = kafka.DialLeader(context.Background(), "tcp", ConfigUtil.KafkaBrokers[index+1], topic, part)
if err == nil {
break
}
}
}
return client, err
}
/** /**
* @Author zhangjun * @Author zhangjun
* @Description * @Description
@ -343,7 +373,7 @@ func InitTopic(topic string) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
CreateTopic(topic) CreateTopic2(topic)
} }
} }
@ -392,6 +422,19 @@ func CreateTopic2(topic string) {
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
conn2,_:=net.Dial("tcp",ConfigUtil.KafkaBrokers[1])
client2:= kafka.NewConn(conn2 ,"__consumer_offsets",0)
err2 := client2.CreateTopics(kafka.TopicConfig{
NumPartitions: int(ConfigUtil.KafkaParts),
ReplicationFactor: int(ConfigUtil.KafkaReply),
Topic: topic,
})
if err2 != nil {
fmt.Println(err2.Error())
}
} }
} }

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save