From 008af04431680c137f75b0afd9343eb382f73aa1 Mon Sep 17 00:00:00 2001 From: wanggang <76527413@qq.com> Date: Mon, 10 May 2021 17:06:43 +0800 Subject: [PATCH] update Former-commit-id: 863963c3b043804a0491a23800d5e56c49134d75 Former-commit-id: 667b6f829db3351f0ec2f5fbf3399574ab21cd35 --- labs/doris/Kafka2Doris/Kafka2Doris.csproj | 5 +- labs/doris/Kafka2Doris/Worker.cs | 96 ++++++++++++++----- .../Kafka2Doris/appsettings.Development.json | 17 ++++ labs/doris/Kafka2Doris/appsettings.json | 12 ++- labs/doris/conf/kafka-connect/boot.sh | 2 +- labs/doris/conf/kafka-connect/example.json | 6 ++ .../kafka-connect/kafka2elasticsearch.json | 9 +- .../doris/conf/kafka-connect/mysql2kafka.json | 2 +- labs/doris/readme.md | 30 ++++-- 9 files changed, 143 insertions(+), 36 deletions(-) create mode 100644 labs/doris/Kafka2Doris/appsettings.Development.json create mode 100644 labs/doris/conf/kafka-connect/example.json diff --git a/labs/doris/Kafka2Doris/Kafka2Doris.csproj b/labs/doris/Kafka2Doris/Kafka2Doris.csproj index d2ca6798..3a0ca022 100644 --- a/labs/doris/Kafka2Doris/Kafka2Doris.csproj +++ b/labs/doris/Kafka2Doris/Kafka2Doris.csproj @@ -1,4 +1,4 @@ - + Exe @@ -12,6 +12,9 @@ + + PreserveNewest + PreserveNewest diff --git a/labs/doris/Kafka2Doris/Worker.cs b/labs/doris/Kafka2Doris/Worker.cs index 9a1b37b2..3e7657a3 100644 --- a/labs/doris/Kafka2Doris/Worker.cs +++ b/labs/doris/Kafka2Doris/Worker.cs @@ -4,8 +4,8 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; -using System.Linq; using System.Net.Http; using System.Net.Http.Headers; using System.Text; @@ -21,7 +21,7 @@ namespace Kafka2Doris private readonly IConfiguration _config; private readonly IHttpClientFactory _httpClientFactory; - public Worker(ILogger logger, IConfiguration config,IHttpClientFactory httpClientFactory) + public Worker(ILogger logger, IConfiguration config, IHttpClientFactory httpClientFactory) { this._logger = logger; this._config = config; @@ -34,17 +34,17 @@ namespace Kafka2Doris { _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now); // - var topics = _config["topics"].Split(','); + var topics = _config.GetSection("kafka").GetValue("topics", "").Split(','); foreach (var topic in topics) { var conf = new ConsumerConfig { - BootstrapServers = _config.GetValue("server", "localhost:9092"), + BootstrapServers = _config.GetSection("kafka").GetValue("host", "localhost:9092"), GroupId = $"doris-v1", AutoOffsetReset = AutoOffsetReset.Earliest, - EnableAutoCommit=false + EnableAutoCommit = false }; - var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout",5)); + var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout", 5)); try { using (var consumer = new ConsumerBuilder(conf).Build()) @@ -53,7 +53,8 @@ namespace Kafka2Doris { consumer.Subscribe(topic); CancellationTokenSource cts = new CancellationTokenSource(); - Console.CancelKeyPress += (_, e) => { + Console.CancelKeyPress += (_, e) => + { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; @@ -61,12 +62,12 @@ namespace Kafka2Doris { var max = _config.GetValue("max", 1000); var list = new List(max); - while (max>0) + while (max > 0) { try { var consumeResult = consumer.Consume(timeout); - if(consumeResult == null) + if (consumeResult == null) { break; } @@ -84,32 +85,83 @@ namespace Kafka2Doris } max -= 1; } - if(list.Count>0) + if (list.Count > 0) { var httpClient = this._httpClientFactory.CreateClient(); - var username = _config.GetValue("username", "root"); - var password = _config.GetValue("password", "aA123456!"); + var username = _config.GetSection("doris").GetValue("username", "root"); + var password = _config.GetSection("doris").GetValue("password", "aA123456!"); httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"))); + httpClient.DefaultRequestHeaders.Add("Expect", "100-continue"); + httpClient.DefaultRequestHeaders.Add("strip_outer_array", "true"); + httpClient.DefaultRequestHeaders.Add("strip_outer_array", "true"); + httpClient.DefaultRequestHeaders.Add("columns", "UserName,SecurityStamp,PasswordHash,PasswordConfirmed,Email,EmailConfirmed,PhoneNumber,PhoneNumberConfirmed,RealName,IdentityNumber,IdentityConfirmed,NickName,Avatar,Sex,Birthday,LockoutEnabled,AccessFailedCount,LockoutEnd,RowVersion,Created,Modified,Deleted"); + httpClient.DefaultRequestHeaders.Add("jsonpaths", "\"$.UserName\", \"$.SecurityStamp\", \"$.PasswordHash`\", \"$.PasswordConfirmed\", \"$.Email\", \"$.EmailConfirmed\", \"$.PhoneNumber\", \"$.PhoneNumberConfirmed\", \"$.RealName\", \"$.IdentityNumber\", \"$.IdentityConfirmed\", \"$.NickName\", \"$.Avatar\", \"$.Sex\", \"$.Birthday\", \"$.LockoutEnabled\", \"$.AccessFailedCount\", \"$.LockoutEnd\", \"$.RowVersion\", \"$.Created\", \"$.Modified\", \"$.Deleted\""); httpClient.DefaultRequestHeaders.Add("label", DateTime.Now.ToString()); - var url = $"{_config.GetValue("doris","http://localhost:8030")}/api/example/User/_stream_load"; + var server = _config.GetSection("doris").GetValue("server", "http://localhost:8030"); + var database = _config.GetSection("doris").GetValue("database", "example"); + var url = $"{server}/api/{database}/User/_stream_load"; + using (var multiContent = new MultipartFormDataContent()) { using (var ms = new MemoryStream()) { - using(var sw = new StreamWriter(ms)) + using (var sw = new StreamWriter(ms)) { - sw.WriteLine(""); - //{ "Id":"49fa11e7-4404-4fe5-9873-e89a01d8e529","UserName":"super1","SecurityStamp":"123456","PasswordHash":"579f889441b4a55d667233941d72a83ed644f7e5","PasswordConfirmed":1,"Email":"super@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"超级管理员","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"1e59e461-af12-446f-b876-eac8f58d3c79","Created":1618377036611675,"Modified":null,"Deleted":null} + sw.WriteLine("["); + for (int i = 0; i < list.Count; i++) + { + var item = list[i]; + sw.Write($"{item}"); + if (i < list.Count - 1) + { + sw.WriteLine(','); + } + else + { + sw.WriteLine(); + } + } + sw.WriteLine("]"); + sw.Flush(); var data = ms.ToArray(); + Debug.WriteLine(Encoding.UTF8.GetString(data)); var fileContent = new ByteArrayContent(data); - multiContent.Add(fileContent, "file", "input.csv"); - var result = httpClient.PostAsync(url, multiContent).Result;//401 http basic + multiContent.Add(fileContent, "file", "example.json"); + var result = httpClient.PutAsync(url, multiContent).Result; + if (result.StatusCode == System.Net.HttpStatusCode.OK) + { + var responseText = result.Content.ReadAsStringAsync().Result; + Debug.WriteLine(responseText); + var response = JsonDocument.Parse(responseText); + var label = response.RootElement.GetProperty("Label").GetRawText(); + var message = response.RootElement.GetProperty("Message").GetRawText(); + this._logger.LogInformation($"{label}:{message}"); + var status = response.RootElement.GetProperty("Status").GetRawText(); + if (status == "Success" || status == "Publish Timeout") + { + consumer.Commit(); + } + } + //{ + // "TxnId": 3009, + // "Label": "c1f15903-805d-4d8a-8533-4c96080c49fe", + // "Status": "Success", + // "Message": "OK", + // "NumberTotalRows": 3, + // "NumberLoadedRows": 3, + // "NumberFilteredRows": 0, + // "NumberUnselectedRows": 0, + // "LoadBytes": 1678, + // "LoadTimeMs": 689, + // "BeginTxnTimeMs": 1, + // "StreamLoadPutTimeMs": 4, + // "ReadDataTimeMs": 0, + // "WriteDataTimeMs": 257, + // "CommitAndPublishTimeMs": 421 + //} } } - } - //httpClient.PutAsync - //consumer.Commit(); } } catch (OperationCanceledException) @@ -129,7 +181,7 @@ namespace Kafka2Doris this._logger.LogError(ex.ToString()); } } - + // await Task.Delay(this._config.GetValue("delay", 1000 * 60), stoppingToken); } diff --git a/labs/doris/Kafka2Doris/appsettings.Development.json b/labs/doris/Kafka2Doris/appsettings.Development.json new file mode 100644 index 00000000..fe897830 --- /dev/null +++ b/labs/doris/Kafka2Doris/appsettings.Development.json @@ -0,0 +1,17 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning" + } + }, + "kafka": { + "host": "kafka:9092", + "topics": "mysql.example.User" + }, + "doris": { + "server": "http://localhost:8030", + "username": "root", + "password": "aA123456!" + } +} \ No newline at end of file diff --git a/labs/doris/Kafka2Doris/appsettings.json b/labs/doris/Kafka2Doris/appsettings.json index 0f8512d7..4302547b 100644 --- a/labs/doris/Kafka2Doris/appsettings.json +++ b/labs/doris/Kafka2Doris/appsettings.json @@ -5,6 +5,14 @@ "Microsoft": "Warning" } }, - "kafka": "192.168.100.144:9092", - "topics": "mysql.example.User" + "kafka": { + "host": "192.168.100.144:9092", + "topics": "mysql.example.User" + }, + "doris": { + "server": "http://doris-fe:8030", + "username": "root", + "password": "aA123456!", + "database": "example" + } } \ No newline at end of file diff --git a/labs/doris/conf/kafka-connect/boot.sh b/labs/doris/conf/kafka-connect/boot.sh index 40fb6e7d..e15c7532 100644 --- a/labs/doris/conf/kafka-connect/boot.sh +++ b/labs/doris/conf/kafka-connect/boot.sh @@ -12,5 +12,5 @@ while :; do sleep 5 done cd /usr/share/confluent-hub-components/ -#./start.sh +./start.sh sleep infinity diff --git a/labs/doris/conf/kafka-connect/example.json b/labs/doris/conf/kafka-connect/example.json new file mode 100644 index 00000000..0366525b --- /dev/null +++ b/labs/doris/conf/kafka-connect/example.json @@ -0,0 +1,6 @@ +[ + {"Id":"49fa11e7-4404-4fe5-9873-e89a01d8e529","UserName":"super","SecurityStamp":"123456","PasswordHash":"579f889441b4a55d667233941d72a83ed644f7e5","PasswordConfirmed":1,"Email":"super@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"超级管理员","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"1e59e461-af12-446f-b876-eac8f58d3c79","Created":1618377036611,"Modified":null,"Deleted":null}, + {"Id":"74fab867-a65a-4cda-8b2f-63e12d003b79","UserName":"user","SecurityStamp":"123456","PasswordHash":"71dd07494c5ee54992a27746d547e25dee01bd97","PasswordConfirmed":1,"Email":"user@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"普通用户","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"2b691577-e96c-40fe-897e-3adc9a4488bf","Created":1618377036611,"Modified":null,"Deleted":null}, + {"Id":"e6a2e59c-a4e1-4ced-856c-036b6e4786d1","UserName":"admin","SecurityStamp":"123456","PasswordHash":"579f889441b4a55d667233941d72a83ed644f7e5","PasswordConfirmed":1,"Email":"admin@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"管理员","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"7d53094e-569d-4689-a6d1-d2c1f99da85b","Created":1618377036611,"Modified":null,"Deleted":null} +] + \ No newline at end of file diff --git a/labs/doris/conf/kafka-connect/kafka2elasticsearch.json b/labs/doris/conf/kafka-connect/kafka2elasticsearch.json index 693f4975..e1e46633 100644 --- a/labs/doris/conf/kafka-connect/kafka2elasticsearch.json +++ b/labs/doris/conf/kafka-connect/kafka2elasticsearch.json @@ -3,7 +3,7 @@ "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": 1, - "topics": "mysql.example.User", + "topics.regex": "mysql.example.*", "key.ignore": false, "schema.ignore": "false", "key.converter.schemas.enable": false, @@ -12,12 +12,15 @@ "type.name": "kafka-connect", "write.method": "upsert", "behavior.on.null.values": "delete", - "transforms": "ExtractFieldObject,ValueToKey,extractKey", + "transforms": "ExtractFieldObject,ValueToKey,extractKey,index", "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractFieldObject.field": "after", "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.ValueToKey.fields": "Id", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", - "transforms.extractKey.field": "Id" + "transforms.extractKey.field": "Id", + "transforms.index.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.index.regex": "(.*)", + "transforms.index.replacement": "$1" } } \ No newline at end of file diff --git a/labs/doris/conf/kafka-connect/mysql2kafka.json b/labs/doris/conf/kafka-connect/mysql2kafka.json index b368d326..1ab861ae 100644 --- a/labs/doris/conf/kafka-connect/mysql2kafka.json +++ b/labs/doris/conf/kafka-connect/mysql2kafka.json @@ -11,7 +11,7 @@ "database.server.name": "mysql", "database.include": "example", "database.history.kafka.bootstrap.servers": "kafka:9092", - "database.history.kafka.topic": "schema.example", + "database.history.kafka.topic": "mysql.schema", "include.schema.changes": "true" } } \ No newline at end of file diff --git a/labs/doris/readme.md b/labs/doris/readme.md index 870ae35c..55bd4777 100644 --- a/labs/doris/readme.md +++ b/labs/doris/readme.md @@ -38,6 +38,23 @@ mysql配置了主从,创建了数据库example,新建了User表 查看全部插件:http://localhost:8083/connector-plugins 查看运行插件:http://localhost:8083/connectors 查看插件 +## mysql 导入 kafka + +debezium 默认每个表一个 topic + +设置时间格式: + time.precision.mode + +多个表导入一个 topic: + "transforms":"Reroute", + "transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter", + "transforms.Reroute.topic.regex":"(.*)", + "transforms.Reroute.topic.replacement":"mysql.example" + +## elastic 导入 doris: + +使用 topics.regex 动态处理多个 topic 到 index + ## kibana 查看 elasticsearch 状态: http://localhost:5601 @@ -73,6 +90,7 @@ USE example; ### 创建表: +//curl -i -v --location-trusted -u root:aA123456! -H "format: json" -H "strip_outer_array: true" -T example.json http://doris-fe:8030/api/example/User/_stream_load CREATE TABLE `User` ( `Id` char(36) NOT NULL COMMENT 'Id', `UserName` varchar(255) NOT NULL COMMENT 'UserName', @@ -89,14 +107,14 @@ CREATE TABLE `User` ( `NickName` varchar(255) REPLACE NULL COMMENT 'NickName', `Avatar` varchar(255) REPLACE NULL COMMENT 'Avatar', `Sex` int(0) REPLACE NULL DEFAULT NULL COMMENT 'Sex', - `Birthday` datetime REPLACE NULL DEFAULT NULL COMMENT 'Birthday', + `Birthday` BIGINT REPLACE NULL COMMENT 'Birthday', `LockoutEnabled` tinyint(1) REPLACE NOT NULL COMMENT 'LockoutEnabled', `AccessFailedCount` int(0) REPLACE NOT NULL COMMENT 'AccessFailedCount', - `LockoutEnd` datetime REPLACE NULL DEFAULT NULL COMMENT 'LockoutEnd', + `LockoutEnd` BIGINT REPLACE NULL DEFAULT NULL COMMENT 'LockoutEnd', `RowVersion` varchar(255) REPLACE NULL COMMENT 'RowVersion', - `Created` datetime REPLACE NOT NULL COMMENT 'Created', - `Modified` datetime REPLACE NULL DEFAULT NULL COMMENT 'Modified', - `Deleted` datetime REPLACE NULL DEFAULT NULL COMMENT 'Deleted' + `Created` BIGINT REPLACE NOT NULL COMMENT 'Created', + `Modified` BIGINT REPLACE NULL DEFAULT NULL COMMENT 'Modified', + `Deleted` BIGINT REPLACE NULL DEFAULT NULL COMMENT 'Deleted' ) AGGREGATE KEY(Id,UserName) DISTRIBUTED BY HASH(Id) BUCKETS 10 @@ -125,7 +143,7 @@ SHOW ALL ROUTINE LOAD; 创建导入任务: -CREATE ROUTINE LOAD example.job2 on User +CREATE ROUTINE LOAD example.job3 on User --ALTER ROUTINE LOAD FOR example.job1 columns ( `Id`,