You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iot/labs/doris/Kafka2Doris/Worker.cs

258 lines
15 KiB

using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
namespace Kafka2Doris
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IConfiguration _config;
private readonly IHttpClientFactory _httpClientFactory;
public Worker(ILogger<Worker> logger, IConfiguration config, IHttpClientFactory httpClientFactory)
{
this._logger = logger;
this._config = config;
this._httpClientFactory = httpClientFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
var conf = new ConsumerConfig
{
BootstrapServers = _config.GetSection("kafka").GetValue("host", "localhost:9092"),
GroupId = $"doris-v1",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
var topicsRegex = _config.GetSection("kafka").GetValue("topics.regex", "mysql.example.*");
var timeout = TimeSpan.FromSeconds(_config.GetSection("kafka").GetValue("timeout", 20));
var topics = new List<string>();
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = conf.BootstrapServers }).Build())
{
var meta = adminClient.GetMetadata(timeout);
topics = meta.Topics.Where(o => Regex.IsMatch(o.Topic, topicsRegex)).Select(o => o.Topic).ToList();
}
foreach (var topic in topics)
{
try
{
using (var consumer = new ConsumerBuilder<Ignore, string>(conf).Build())
{
try
{
consumer.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
var max = _config.GetValue("max", 1000);
var list = new List<string>(max);
JsonElement schema = default;
while (max > 0)
{
try
{
var consumeResult = consumer.Consume(timeout);
if (consumeResult == null)
{
break;
}
Debug.WriteLine(consumeResult.Message.Value);
var json = JsonDocument.Parse(consumeResult.Message.Value);
schema = json.RootElement.GetProperty("schema");
var after = json.RootElement.GetProperty("payload").GetProperty("after");
if (after.ValueKind == JsonValueKind.Object)
{
list.Add(after.GetRawText());
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
break;
}
max -= 1;
}
if (list.Count > 0)
{
CreateIfNotExists(topic, schema,_config);
var httpClient = this._httpClientFactory.CreateClient();
var username = _config.GetSection("doris").GetValue("username", "root");
var password = _config.GetSection("doris").GetValue("password", "aA123456!");
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")));
httpClient.DefaultRequestHeaders.Add("Expect", "100-continue");
httpClient.DefaultRequestHeaders.Add("strip_outer_array", "true");
httpClient.DefaultRequestHeaders.Add("strip_outer_array", "true");
httpClient.DefaultRequestHeaders.Add("columns", "UserName,SecurityStamp,PasswordHash,PasswordConfirmed,Email,EmailConfirmed,PhoneNumber,PhoneNumberConfirmed,RealName,IdentityNumber,IdentityConfirmed,NickName,Avatar,Sex,Birthday,LockoutEnabled,AccessFailedCount,LockoutEnd,RowVersion,Created,Modified,Deleted");
httpClient.DefaultRequestHeaders.Add("jsonpaths", "\"$.UserName\", \"$.SecurityStamp\", \"$.PasswordHash`\", \"$.PasswordConfirmed\", \"$.Email\", \"$.EmailConfirmed\", \"$.PhoneNumber\", \"$.PhoneNumberConfirmed\", \"$.RealName\", \"$.IdentityNumber\", \"$.IdentityConfirmed\", \"$.NickName\", \"$.Avatar\", \"$.Sex\", \"$.Birthday\", \"$.LockoutEnabled\", \"$.AccessFailedCount\", \"$.LockoutEnd\", \"$.RowVersion\", \"$.Created\", \"$.Modified\", \"$.Deleted\"");
httpClient.DefaultRequestHeaders.Add("label", DateTime.Now.ToString());
var server = _config.GetSection("doris").GetValue("server", "http://localhost:8030");
var database = _config.GetSection("doris").GetValue("database", "example");
var url = $"{server}/api/{database}/User/_stream_load";
using (var multiContent = new MultipartFormDataContent())
{
using (var ms = new MemoryStream())
{
using (var sw = new StreamWriter(ms))
{
sw.WriteLine("[");
for (int i = 0; i < list.Count; i++)
{
var item = list[i];
sw.Write($"{item}");
if (i < list.Count - 1)
{
sw.WriteLine(',');
}
else
{
sw.WriteLine();
}
}
sw.WriteLine("]");
sw.Flush();
var data = ms.ToArray();
Debug.WriteLine(Encoding.UTF8.GetString(data));
var fileContent = new ByteArrayContent(data);
multiContent.Add(fileContent, "file", "example.json");
var result = httpClient.PutAsync(url, multiContent).Result;
if (result.StatusCode == System.Net.HttpStatusCode.OK)
{
var responseText = result.Content.ReadAsStringAsync().Result;
Debug.WriteLine(responseText);
var response = JsonDocument.Parse(responseText);
var label = response.RootElement.GetProperty("Label").GetString();
var message = response.RootElement.GetProperty("Message").GetString();
this._logger.LogInformation($"{label}:{message}");
var status = response.RootElement.GetProperty("Status").GetString();
if (status == "Success" || status == "Publish Timeout")
{
consumer.Commit();
}
}
//{
// "TxnId": 3009,
// "Label": "c1f15903-805d-4d8a-8533-4c96080c49fe",
// "Status": "Success",
// "Message": "OK",
// "NumberTotalRows": 3,
// "NumberLoadedRows": 3,
// "NumberFilteredRows": 0,
// "NumberUnselectedRows": 0,
// "LoadBytes": 1678,
// "LoadTimeMs": 689,
// "BeginTxnTimeMs": 1,
// "StreamLoadPutTimeMs": 4,
// "ReadDataTimeMs": 0,
// "WriteDataTimeMs": 257,
// "CommitAndPublishTimeMs": 421
//}
}
}
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
catch (Exception ex)//DbUpdateException//ProduceException<Null,string>
{
Console.WriteLine(ex.Message);
}
}
}
catch (Exception ex)
{
this._logger.LogError(ex.ToString());
}
}
//
await Task.Delay(this._config.GetValue("delay", 1000 * 60), stoppingToken);
}
}
private void CreateIfNotExists(string topic, JsonElement schema,IConfiguration config)
{
var tableName = topic.Replace(".", "_");
var sql = $"CREATE TABLE IF NOT EXISTS `{tableName}` (";
sql += "\n";
var fields = schema.GetProperty("fields")[0].GetProperty("fields");
var length = fields.GetArrayLength();
var index = 0;
var buckets = _config.GetSection("doris").GetValue("buckets", 1);
var replication_num = _config.GetSection("doris").GetValue("replication_num", 1);
foreach (var item in fields.EnumerateArray())
{
var type = item.GetProperty("type").GetString();
var optional = item.GetProperty("optional").GetBoolean();
var field = item.GetProperty("field").GetString();
var name = item.TryGetProperty("name", out var nameElement)?nameElement.GetString():null;
var notNull = !optional;
if (!string.IsNullOrEmpty(name))
{
if(name== "org.apache.kafka.connect.data.Timestamp")
{
type = "DATETIME";
}
Debug.WriteLine($"{type}:{name}");
}
if(type=="string")
{
type = "VARCHAR(255)";
}
else if(type=="int16")
{
if(notNull)
{
notNull = false;
}
}
if(type.StartsWith("int"))
{
if(int.Parse(type.Substring(3)) <= 32)
{
type = "INT";
}
else
{
type = "BIGINT";
}
}
index += 1;
sql += $"`{field}` {type.ToUpper()}{(notNull ? " NOT NULL" : "")} {(index<length?",":"")}\n";
}
sql += $")\nUNIQUE KEY(Id)\nDISTRIBUTED BY HASH(Id) BUCKETS {buckets}\nPROPERTIES(\"replication_num\" = \"{replication_num}\");";
Debug.WriteLine(sql);
}
}
}