Former-commit-id: 863963c3b043804a0491a23800d5e56c49134d75
Former-commit-id: 667b6f829db3351f0ec2f5fbf3399574ab21cd35
1.0
wanggang 4 years ago
parent be3b7f6e79
commit 008af04431

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
@ -12,6 +12,9 @@
</ItemGroup>
<ItemGroup>
<None Update="appsettings.Development.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>

@ -4,8 +4,8 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
@ -21,7 +21,7 @@ namespace Kafka2Doris
private readonly IConfiguration _config;
private readonly IHttpClientFactory _httpClientFactory;
public Worker(ILogger<Worker> logger, IConfiguration config,IHttpClientFactory httpClientFactory)
public Worker(ILogger<Worker> logger, IConfiguration config, IHttpClientFactory httpClientFactory)
{
this._logger = logger;
this._config = config;
@ -34,17 +34,17 @@ namespace Kafka2Doris
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
//
var topics = _config["topics"].Split(',');
var topics = _config.GetSection("kafka").GetValue("topics", "").Split(',');
foreach (var topic in topics)
{
var conf = new ConsumerConfig
{
BootstrapServers = _config.GetValue("server", "localhost:9092"),
BootstrapServers = _config.GetSection("kafka").GetValue("host", "localhost:9092"),
GroupId = $"doris-v1",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit=false
EnableAutoCommit = false
};
var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout",5));
var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout", 5));
try
{
using (var consumer = new ConsumerBuilder<Ignore, string>(conf).Build())
@ -53,7 +53,8 @@ namespace Kafka2Doris
{
consumer.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
@ -61,12 +62,12 @@ namespace Kafka2Doris
{
var max = _config.GetValue("max", 1000);
var list = new List<string>(max);
while (max>0)
while (max > 0)
{
try
{
var consumeResult = consumer.Consume(timeout);
if(consumeResult == null)
if (consumeResult == null)
{
break;
}
@ -84,32 +85,83 @@ namespace Kafka2Doris
}
max -= 1;
}
if(list.Count>0)
if (list.Count > 0)
{
var httpClient = this._httpClientFactory.CreateClient();
var username = _config.GetValue("username", "root");
var password = _config.GetValue("password", "aA123456!");
var username = _config.GetSection("doris").GetValue("username", "root");
var password = _config.GetSection("doris").GetValue("password", "aA123456!");
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")));
httpClient.DefaultRequestHeaders.Add("Expect", "100-continue");
httpClient.DefaultRequestHeaders.Add("strip_outer_array", "true");
httpClient.DefaultRequestHeaders.Add("strip_outer_array", "true");
httpClient.DefaultRequestHeaders.Add("columns", "UserName,SecurityStamp,PasswordHash,PasswordConfirmed,Email,EmailConfirmed,PhoneNumber,PhoneNumberConfirmed,RealName,IdentityNumber,IdentityConfirmed,NickName,Avatar,Sex,Birthday,LockoutEnabled,AccessFailedCount,LockoutEnd,RowVersion,Created,Modified,Deleted");
httpClient.DefaultRequestHeaders.Add("jsonpaths", "\"$.UserName\", \"$.SecurityStamp\", \"$.PasswordHash`\", \"$.PasswordConfirmed\", \"$.Email\", \"$.EmailConfirmed\", \"$.PhoneNumber\", \"$.PhoneNumberConfirmed\", \"$.RealName\", \"$.IdentityNumber\", \"$.IdentityConfirmed\", \"$.NickName\", \"$.Avatar\", \"$.Sex\", \"$.Birthday\", \"$.LockoutEnabled\", \"$.AccessFailedCount\", \"$.LockoutEnd\", \"$.RowVersion\", \"$.Created\", \"$.Modified\", \"$.Deleted\"");
httpClient.DefaultRequestHeaders.Add("label", DateTime.Now.ToString());
var url = $"{_config.GetValue("doris","http://localhost:8030")}/api/example/User/_stream_load";
var server = _config.GetSection("doris").GetValue("server", "http://localhost:8030");
var database = _config.GetSection("doris").GetValue("database", "example");
var url = $"{server}/api/{database}/User/_stream_load";
using (var multiContent = new MultipartFormDataContent())
{
using (var ms = new MemoryStream())
{
using(var sw = new StreamWriter(ms))
using (var sw = new StreamWriter(ms))
{
sw.WriteLine("");
//{ "Id":"49fa11e7-4404-4fe5-9873-e89a01d8e529","UserName":"super1","SecurityStamp":"123456","PasswordHash":"579f889441b4a55d667233941d72a83ed644f7e5","PasswordConfirmed":1,"Email":"super@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"超级管理员","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"1e59e461-af12-446f-b876-eac8f58d3c79","Created":1618377036611675,"Modified":null,"Deleted":null}
sw.WriteLine("[");
for (int i = 0; i < list.Count; i++)
{
var item = list[i];
sw.Write($"{item}");
if (i < list.Count - 1)
{
sw.WriteLine(',');
}
else
{
sw.WriteLine();
}
}
sw.WriteLine("]");
sw.Flush();
var data = ms.ToArray();
Debug.WriteLine(Encoding.UTF8.GetString(data));
var fileContent = new ByteArrayContent(data);
multiContent.Add(fileContent, "file", "input.csv");
var result = httpClient.PostAsync(url, multiContent).Result;//401 http basic
multiContent.Add(fileContent, "file", "example.json");
var result = httpClient.PutAsync(url, multiContent).Result;
if (result.StatusCode == System.Net.HttpStatusCode.OK)
{
var responseText = result.Content.ReadAsStringAsync().Result;
Debug.WriteLine(responseText);
var response = JsonDocument.Parse(responseText);
var label = response.RootElement.GetProperty("Label").GetRawText();
var message = response.RootElement.GetProperty("Message").GetRawText();
this._logger.LogInformation($"{label}:{message}");
var status = response.RootElement.GetProperty("Status").GetRawText();
if (status == "Success" || status == "Publish Timeout")
{
consumer.Commit();
}
}
//{
// "TxnId": 3009,
// "Label": "c1f15903-805d-4d8a-8533-4c96080c49fe",
// "Status": "Success",
// "Message": "OK",
// "NumberTotalRows": 3,
// "NumberLoadedRows": 3,
// "NumberFilteredRows": 0,
// "NumberUnselectedRows": 0,
// "LoadBytes": 1678,
// "LoadTimeMs": 689,
// "BeginTxnTimeMs": 1,
// "StreamLoadPutTimeMs": 4,
// "ReadDataTimeMs": 0,
// "WriteDataTimeMs": 257,
// "CommitAndPublishTimeMs": 421
//}
}
}
}
//httpClient.PutAsync
//consumer.Commit();
}
}
catch (OperationCanceledException)
@ -129,7 +181,7 @@ namespace Kafka2Doris
this._logger.LogError(ex.ToString());
}
}
//
await Task.Delay(this._config.GetValue("delay", 1000 * 60), stoppingToken);
}

@ -0,0 +1,17 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft": "Warning"
}
},
"kafka": {
"host": "kafka:9092",
"topics": "mysql.example.User"
},
"doris": {
"server": "http://localhost:8030",
"username": "root",
"password": "aA123456!"
}
}

@ -5,6 +5,14 @@
"Microsoft": "Warning"
}
},
"kafka": "192.168.100.144:9092",
"topics": "mysql.example.User"
"kafka": {
"host": "192.168.100.144:9092",
"topics": "mysql.example.User"
},
"doris": {
"server": "http://doris-fe:8030",
"username": "root",
"password": "aA123456!",
"database": "example"
}
}

@ -12,5 +12,5 @@ while :; do
sleep 5
done
cd /usr/share/confluent-hub-components/
#./start.sh
./start.sh
sleep infinity

@ -0,0 +1,6 @@
[
{"Id":"49fa11e7-4404-4fe5-9873-e89a01d8e529","UserName":"super","SecurityStamp":"123456","PasswordHash":"579f889441b4a55d667233941d72a83ed644f7e5","PasswordConfirmed":1,"Email":"super@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"超级管理员","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"1e59e461-af12-446f-b876-eac8f58d3c79","Created":1618377036611,"Modified":null,"Deleted":null},
{"Id":"74fab867-a65a-4cda-8b2f-63e12d003b79","UserName":"user","SecurityStamp":"123456","PasswordHash":"71dd07494c5ee54992a27746d547e25dee01bd97","PasswordConfirmed":1,"Email":"user@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"普通用户","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"2b691577-e96c-40fe-897e-3adc9a4488bf","Created":1618377036611,"Modified":null,"Deleted":null},
{"Id":"e6a2e59c-a4e1-4ced-856c-036b6e4786d1","UserName":"admin","SecurityStamp":"123456","PasswordHash":"579f889441b4a55d667233941d72a83ed644f7e5","PasswordConfirmed":1,"Email":"admin@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"管理员","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"7d53094e-569d-4689-a6d1-d2c1f99da85b","Created":1618377036611,"Modified":null,"Deleted":null}
]

@ -3,7 +3,7 @@
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": 1,
"topics": "mysql.example.User",
"topics.regex": "mysql.example.*",
"key.ignore": false,
"schema.ignore": "false",
"key.converter.schemas.enable": false,
@ -12,12 +12,15 @@
"type.name": "kafka-connect",
"write.method": "upsert",
"behavior.on.null.values": "delete",
"transforms": "ExtractFieldObject,ValueToKey,extractKey",
"transforms": "ExtractFieldObject,ValueToKey,extractKey,index",
"transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldObject.field": "after",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "Id",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "Id"
"transforms.extractKey.field": "Id",
"transforms.index.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.index.regex": "(.*)",
"transforms.index.replacement": "$1"
}
}

@ -11,7 +11,7 @@
"database.server.name": "mysql",
"database.include": "example",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema.example",
"database.history.kafka.topic": "mysql.schema",
"include.schema.changes": "true"
}
}

@ -38,6 +38,23 @@ mysql配置了主从创建了数据库example新建了User表
查看全部插件http://localhost:8083/connector-plugins
查看运行插件http://localhost:8083/connectors 查看插件
## mysql 导入 kafka
debezium 默认每个表一个 topic
设置时间格式:
time.precision.mode
多个表导入一个 topic
"transforms":"Reroute",
"transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex":"(.*)",
"transforms.Reroute.topic.replacement":"mysql.example"
## elastic 导入 doris
使用 topics.regex 动态处理多个 topic 到 index
## kibana
查看 elasticsearch 状态: http://localhost:5601
@ -73,6 +90,7 @@ USE example;
### 创建表:
//curl -i -v --location-trusted -u root:aA123456! -H "format: json" -H "strip_outer_array: true" -T example.json http://doris-fe:8030/api/example/User/_stream_load
CREATE TABLE `User` (
`Id` char(36) NOT NULL COMMENT 'Id',
`UserName` varchar(255) NOT NULL COMMENT 'UserName',
@ -89,14 +107,14 @@ CREATE TABLE `User` (
`NickName` varchar(255) REPLACE NULL COMMENT 'NickName',
`Avatar` varchar(255) REPLACE NULL COMMENT 'Avatar',
`Sex` int(0) REPLACE NULL DEFAULT NULL COMMENT 'Sex',
`Birthday` datetime REPLACE NULL DEFAULT NULL COMMENT 'Birthday',
`Birthday` BIGINT REPLACE NULL COMMENT 'Birthday',
`LockoutEnabled` tinyint(1) REPLACE NOT NULL COMMENT 'LockoutEnabled',
`AccessFailedCount` int(0) REPLACE NOT NULL COMMENT 'AccessFailedCount',
`LockoutEnd` datetime REPLACE NULL DEFAULT NULL COMMENT 'LockoutEnd',
`LockoutEnd` BIGINT REPLACE NULL DEFAULT NULL COMMENT 'LockoutEnd',
`RowVersion` varchar(255) REPLACE NULL COMMENT 'RowVersion',
`Created` datetime REPLACE NOT NULL COMMENT 'Created',
`Modified` datetime REPLACE NULL DEFAULT NULL COMMENT 'Modified',
`Deleted` datetime REPLACE NULL DEFAULT NULL COMMENT 'Deleted'
`Created` BIGINT REPLACE NOT NULL COMMENT 'Created',
`Modified` BIGINT REPLACE NULL DEFAULT NULL COMMENT 'Modified',
`Deleted` BIGINT REPLACE NULL DEFAULT NULL COMMENT 'Deleted'
)
AGGREGATE KEY(Id,UserName)
DISTRIBUTED BY HASH(Id) BUCKETS 10
@ -125,7 +143,7 @@ SHOW ALL ROUTINE LOAD;
创建导入任务:
CREATE ROUTINE LOAD example.job2 on User
CREATE ROUTINE LOAD example.job3 on User
--ALTER ROUTINE LOAD FOR example.job1
columns (
`Id`,

Loading…
Cancel
Save