diff --git a/dsDataex/Config/Config.ini b/dsDataex/Config/Config.ini index 376d0fa8..a3317518 100644 --- a/dsDataex/Config/Config.ini +++ b/dsDataex/Config/Config.ini @@ -25,7 +25,9 @@ expireTime = 86400 [kafka] -KafkaAddress = 127.0.0.1:9092 +brokers = 192.168.0.200:9092,192.168.0.200:9091 + + [elasticsearch] nodes = http://10.10.14.61:9200,http://10.10.14.62:9200,http://10.10.14.63:9200 @@ -40,5 +42,6 @@ project_grpc = 9010 project_path = D:\GoWork\dsMin\dsDataex [params] -root_orgname = 辽阳市教育局 +;root_orgname = 辽阳市教育局 ;root_orgname = 赤峰市教育局 +root_orgname = 长春市教育局 diff --git a/dsDataex/MyService/DataEX/DataexDAO/DataexDAO.go b/dsDataex/MyService/DataEX/DataexDAO/DataexDAO.go index 34ac2105..697b70cc 100644 --- a/dsDataex/MyService/DataEX/DataexDAO/DataexDAO.go +++ b/dsDataex/MyService/DataEX/DataexDAO/DataexDAO.go @@ -160,8 +160,8 @@ func GetDatasourceOrgIDS(consumeType int,orgID string) []string { func CheckProvideOrgID(provideType int, provideOrgid string,orgID string) (bool, string, map[string]interface{}, error){ //change by zhangjun 2020-07-15 - //ids:=[]string{orgID} - ids:=[]string{strings.ToLower(orgID)} + ids:=[]string{orgID} + //ids:=[]string{strings.ToLower(orgID)} result := CacheUtil.GetListByIds( ids ,CacheUtil.GetBean("t_dataex_orgtree")) diff --git a/dsDataex/MyService/DataEX/DataexService/BaseGOProc.go b/dsDataex/MyService/DataEX/DataexService/BaseGOProc.go index ff627ae1..508fd127 100644 --- a/dsDataex/MyService/DataEX/DataexService/BaseGOProc.go +++ b/dsDataex/MyService/DataEX/DataexService/BaseGOProc.go @@ -11,7 +11,6 @@ import ( "encoding/json" "reflect" "strconv" - "strings" "time" ) @@ -26,8 +25,8 @@ func OrgtreeProc(datas []MySwagger.Data) { var data models.TDataexOrgtree //change by zhangjun 2020-07-15 - //data.Id = jsonData["org_id"].(string) - data.Id = strings.ToLower(jsonData["org_id"].(string)) + data.Id = jsonData["org_id"].(string) + //data.Id = strings.ToLower(jsonData["org_id"].(string)) data.OrgName = jsonData["org_name"].(string) switch jsonData["org_type"].(type) { @@ -142,7 +141,9 @@ func OrgtreeProcBatch(datas []MySwagger.Data) { json.Unmarshal([]byte(datas[no].Data), &jsonData) - IDs =append(IDs,strings.ToLower(jsonData["org_id"].(string))) + //change by zhangjun 2020-07-23 + //IDs =append(IDs,strings.ToLower(jsonData["org_id"].(string))) + IDs =append(IDs,jsonData["org_id"].(string)) if Contains(areaIDs,jsonData["area_code"].(string))==-1 { areaIDs = append(areaIDs, jsonData["area_code"].(string)) @@ -281,7 +282,7 @@ func OrgtreeProcBatch(datas []MySwagger.Data) { sql = "update t_dataex_orgtree set org_name = '" + jsonData["org_name"].(string)+"',org_type="+ orgType +",school_type='"+schoolType+"',parent_id ='" + parentID + "',province_id='"+provinceId+"',city_id='"+cityId+"',area_id='"+areaId+"',change_time='"+time.Now().Format("2006-01-02 15:04:05")+"' where id='" + IDs[no] + "'" }else { - sql = "insert into t_dataex_orgtree (id,org_name,org_type,school_type,parent_id,province_id,city_id,area_id,create_time,delete_flag,enable_flag) values ('"+strings.ToLower(jsonData["org_id"].(string))+"','"+jsonData["org_name"].(string)+"',"+orgType+",'"+schoolType+"','"+parentID+"','"+provinceId+"','"+cityId+"','"+areaId+"','"+time.Now().Format("2006-01-02 15:04:05")+"',-1,1)" + sql = "insert into t_dataex_orgtree (id,org_name,org_type,school_type,parent_id,province_id,city_id,area_id,create_time,delete_flag,enable_flag) values ('"+ jsonData["org_id"].(string)+"','"+jsonData["org_name"].(string)+"',"+orgType+",'"+schoolType+"','"+parentID+"','"+provinceId+"','"+cityId+"','"+areaId+"','"+time.Now().Format("2006-01-02 15:04:05")+"',-1,1)" } sqls=append(sqls,sql) diff --git a/dsDataex/MyService/DataEX/DataexService/BaseJavaProc.go b/dsDataex/MyService/DataEX/DataexService/BaseJavaProc.go index 8bbb0c09..601f9754 100644 --- a/dsDataex/MyService/DataEX/DataexService/BaseJavaProc.go +++ b/dsDataex/MyService/DataEX/DataexService/BaseJavaProc.go @@ -8,7 +8,6 @@ import ( "dsDataex/Utils/DbUtil" "encoding/json" "strconv" - "strings" "time" ) @@ -23,7 +22,9 @@ func JavaOrgtreeProcBatch(datas []MySwagger.Data) { json.Unmarshal([]byte(datas[no].Data), &jsonData) var id=jsonData["ORG_ID"].(string) - IDs =append(IDs,strings.ToLower(id)) + //change by zhangjun 2020-07-23 + //IDs =append(IDs,strings.ToLower(id)) + IDs =append(IDs,id) if Contains(areaIDs,jsonData["AREA_CODE"].(string))==-1 { areaIDs = append(areaIDs, jsonData["AREA_CODE"].(string)) diff --git a/dsDataex/MyService/DataEX/DataexService/BaseLuaProc.go b/dsDataex/MyService/DataEX/DataexService/BaseLuaProc.go index e5161f6a..6cc1c59c 100644 --- a/dsDataex/MyService/DataEX/DataexService/BaseLuaProc.go +++ b/dsDataex/MyService/DataEX/DataexService/BaseLuaProc.go @@ -8,7 +8,6 @@ import ( "dsDataex/Utils/DbUtil" "encoding/json" "strconv" - "strings" "time" ) @@ -23,7 +22,9 @@ func LuaOrgtreeProcBatch(datas []MySwagger.Data) { json.Unmarshal([]byte(datas[no].Data), &jsonData) var id=strconv.FormatFloat(jsonData["ORG_ID"].(float64), 'f', -1, 64) - IDs =append(IDs,strings.ToLower(id)) + //change by zhangjun 2020-07-23 + //IDs =append(IDs,strings.ToLower(id)) + IDs =append(IDs,id) if Contains(areaIDs,jsonData["area_code"].(string))==-1 { areaIDs = append(areaIDs, jsonData["area_code"].(string)) diff --git a/dsDataex/MyService/DataEX/DataexService/DataexService.go b/dsDataex/MyService/DataEX/DataexService/DataexService.go index 0e7d3be8..b0cdbbe6 100644 --- a/dsDataex/MyService/DataEX/DataexService/DataexService.go +++ b/dsDataex/MyService/DataEX/DataexService/DataexService.go @@ -9,7 +9,6 @@ import ( "dsDataex/Utils/ES7Util" "encoding/json" "strconv" - "strings" "time" ) @@ -242,8 +241,8 @@ func DataexSetBatch(systemID string, datas []MySwagger.Data,datasource *models.T for no:=0;no< len(datas) && no<1000 ;no++ { //change by zhangjun 2020-07-15 - //dataIDs = append(dataIDs, datas[no].DataID) - dataIDs = append(dataIDs, strings.ToLower(datas[no].DataID)) + dataIDs = append(dataIDs, datas[no].DataID) + //dataIDs = append(dataIDs, strings.ToLower(datas[no].DataID)) } now :=time.Now() @@ -267,11 +266,12 @@ func DataexSetBatch(systemID string, datas []MySwagger.Data,datasource *models.T esData2.EndTime = DataEX.JsonDate(time.Date(9999,9,9,9,9,9,0,time.Now().Location())) esData2.DelFlag=datas[no].DelFlag + //change by zhangjun 2020-07-15 - //esData2.OrgId=datas[no].OrgID - //esData2.DataId=datas[no].DataID - esData2.OrgId=strings.ToLower(datas[no].OrgID) - esData2.DataId=strings.ToLower(datas[no].DataID) + esData2.OrgId=datas[no].OrgID + esData2.DataId=datas[no].DataID + //esData2.OrgId=strings.ToLower(datas[no].OrgID) + //esData2.DataId=strings.ToLower(datas[no].DataID) var jsonData map[string]interface{} json.Unmarshal([]byte(datas[no].Data), &jsonData) diff --git a/dsDataex/Utils/KafkaUtil/KafkaUtil.go b/dsDataex/Utils/KafkaUtil/KafkaUtil.go new file mode 100644 index 00000000..1f97c23c --- /dev/null +++ b/dsDataex/Utils/KafkaUtil/KafkaUtil.go @@ -0,0 +1,128 @@ +package KafkaUtil + +import ( + "bytes" + "context" + "fmt" + "github.com/segmentio/kafka-go" + "time" +) + +var KafkaClient *kafka.Conn + +var CTX context.Context + +func init() { + + CTX=context.Background() +} + +func ProvideLow(topic string) { + KafkaClient, _ = kafka.DialLeader(context.Background(), "tcp", "192.168.0.200:9092", topic , 0) + + KafkaClient.SetWriteDeadline(time.Now().Add(5 *time.Second)) + + KafkaClient.WriteMessages( + kafka.Message{Value: []byte("one!")}, + kafka.Message{Value: []byte("two!")}, + kafka.Message{Value: []byte("three!")}, + ) + + //KafkaClient.Close() +} + +func ConsumeLow() { + KafkaClient.SetReadDeadline(time.Now().Add(5 *time.Second)) + + batch := KafkaClient.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max + + for { + b := make([]byte, 10e3) // 10KB max per message + + _, err := batch.Read(b) + + if err != nil { + break + } + index := bytes.IndexByte(b, 0) + + fmt.Println(string(b[0:index])) + } + + batch.Close() + //KafkaClient.Close() +} + +func Consume() { + + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"192.168.0.200:9092"}, + Topic: "log_login", + //Partition: 0, + GroupID: "Group_04",//必须指定 Group,否则需要指定 Partition!!! + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }) + + for { + m, err := r.ReadMessage( CTX ) + + if err != nil { + break + } + + fmt.Printf("message at partiton %d offset %d: %s ==> %s\n",m.Partition, m.Offset, string(m.Key), string(m.Value)) + } + + r.Close() +} + +func Provide(){ + + w := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"192.168.0.200:9092"}, + Topic: "log_login", + Balancer: &kafka.LeastBytes{},//Partition 自动分配器 + }) + + w.WriteMessages(context.Background(), + kafka.Message{ + //Key: []byte("Key-A"), + Value: []byte("Hello World! One"), + }, + kafka.Message{ + Value: []byte("Hello World! Two"), + }, + kafka.Message{ + Value: []byte("Hello World! Three"), + }, + ) + + w.Close() +} + +func CreateTopic(topic string){ + + KafkaClient, _ = kafka.DialLeader(CTX , "tcp", "10.10.14.238:9092", "__consumer_offsets" , 0) + + err:= KafkaClient.CreateTopics(kafka.TopicConfig{ + NumPartitions: 8, + ReplicationFactor: 1, + Topic: topic, + }) + + if err != nil{ + fmt.Println(err.Error()) + } +} + +func DeleteTopic(topic string){ + KafkaClient, _ = kafka.DialLeader( CTX, "tcp", "10.10.14.238:9092", topic , 0) + + err:= KafkaClient.DeleteTopics(topic) + + if err != nil{ + fmt.Println(err.Error()) + } + +} \ No newline at end of file diff --git a/dsDataex/Utils/KafkaUtil2/KafkaUtil2.go b/dsDataex/Utils/KafkaUtil2/KafkaUtil2.go new file mode 100644 index 00000000..604ab664 --- /dev/null +++ b/dsDataex/Utils/KafkaUtil2/KafkaUtil2.go @@ -0,0 +1,77 @@ +package KafkaUtil2 + +import ( + "fmt" + "github.com/confluentinc/confluent-kafka-go/kafka" +) +/** + * @Author zhangjun + * @Description + * @Date 2020-07-22 09:58 + * @Param TODO: NOT IN USE,需要 gcc 编译 librdkafka,增加了部署复杂度!!!暂不使用!!! + * @return + **/ +func Consume() { + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": "localhost", + "group.id": "myGroup", + "auto.offset.reset": "earliest", + }) + + if err != nil { + panic(err) + } + + //TODO: 支持正则表达式,模糊匹配 Topic!!!牛!!! + c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil) + + for { + msg, err := c.ReadMessage(-1) + if err == nil { + fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) + } else { + // The client will automatically try to recover from all errors. + fmt.Printf("Consumer error: %v (%v)\n", err, msg) + } + } + + c.Close() +} + +func Provide() { + + p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) + if err != nil { + panic(err) + } + + defer p.Close() + + // Delivery report handler for produced messages + go func() { + for e := range p.Events() { + switch ev := e.(type) { + case *kafka.Message: + if ev.TopicPartition.Error != nil { + fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) + } else { + fmt.Printf("Delivered message to %v\n", ev.TopicPartition) + } + } + } + }() + + // Produce messages to topic (asynchronously) + topic := "myTopic" + + for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} { + p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, + Value: []byte(word), + }, nil) + } + + // Wait for message deliveries before shutting down + p.Flush(15 * 1000) +} \ No newline at end of file diff --git a/dsDataex/go.mod b/dsDataex/go.mod index f1d416f6..1b829466 100644 --- a/dsDataex/go.mod +++ b/dsDataex/go.mod @@ -11,6 +11,7 @@ require ( github.com/agrison/mxj v0.0.0-20160310142625-1269f8afb3b4 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/bndr/gotabulate v1.1.2 // indirect + github.com/confluentinc/confluent-kafka-go v1.4.2 github.com/elastic/go-elasticsearch/v6 v6.8.10 // indirect github.com/elastic/go-elasticsearch/v7 v7.8.0 github.com/fsnotify/fsnotify v1.4.9 // indirect @@ -26,6 +27,7 @@ require ( github.com/olivere/elastic/v7 v7.0.17 github.com/pkg/errors v0.9.1 github.com/satori/go.uuid v1.2.0 + github.com/segmentio/kafka-go v0.3.7 github.com/sirupsen/logrus v1.6.0 github.com/swaggo/gin-swagger v1.2.0 github.com/swaggo/swag v1.5.1 diff --git a/dsDataex/go.sum b/dsDataex/go.sum index 796b4efc..05bb09f8 100644 --- a/dsDataex/go.sum +++ b/dsDataex/go.sum @@ -84,6 +84,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= +github.com/confluentinc/confluent-kafka-go v1.4.2 h1:13EK9RTujF7lVkvHQ5Hbu6bM+Yfrq8L0MkJNnjHSd4Q= +github.com/confluentinc/confluent-kafka-go v1.4.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -113,6 +115,7 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUn github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-linebreak v0.0.0-20180812204043-d8f37254e7d3/go.mod h1:FDHdQKtI1NtvxIYsG/y+ymRaIQIsp+LRSTGl7eBKQEU= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= @@ -360,6 +363,7 @@ github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0 github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= @@ -452,6 +456,7 @@ github.com/pdfcpu/pdfcpu v0.3.2/go.mod h1:/ULj8B76ZnB4445B0yuSASQqlN0kEO+khtEnmP github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/phpdave11/gofpdi v1.0.7/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -476,6 +481,8 @@ github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfF github.com/rwcarlsen/goexif v0.0.0-20180518182100-8d986c03457a/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/segmentio/kafka-go v0.3.7 h1:UCFPJw6KoVkmrilA2LbWVuybJojHzj6gDDFdV7H7IBs= +github.com/segmentio/kafka-go v0.3.7/go.mod h1:8rEphJEczp+yDE/R5vwmaqZgF1wllrl4ioQcNKB8wVA= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY= @@ -575,6 +582,7 @@ github.com/valyala/quicktemplate v1.4.1/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOV github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xormplus/builder v0.0.0-20200331055651-240ff40009be h1:HTSana2sMSKVze3XXYrF89w2tw835Fh+7xX5KPvAkuo= github.com/xormplus/builder v0.0.0-20200331055651-240ff40009be/go.mod h1:PgBA7NoHtnttVkWModa/qpvIWkX6MpOKgyRCWsSKSB0= @@ -604,6 +612,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a h1:YX8ljsm6wXlHZO+aRz9Exqr0evNhKRNe5K/gi+zKh4U= golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/dsDataex/main.go b/dsDataex/main.go index 87f656c5..695a354f 100644 --- a/dsDataex/main.go +++ b/dsDataex/main.go @@ -115,6 +115,12 @@ func main() { CacheUtil.OrgtreeCacheInit() + //KafkaUtil.Provide() + //KafkaUtil.Consume() + + //KafkaUtil.CreateTopic("log_12345") + //KafkaUtil.DeleteTopic("log_12345") + GinServerInit() g.Go(func() error {