diff --git a/labs/doris/Kafka2Doris/Worker.cs b/labs/doris/Kafka2Doris/Worker.cs index 3e7657a3..2160201c 100644 --- a/labs/doris/Kafka2Doris/Worker.cs +++ b/labs/doris/Kafka2Doris/Worker.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; +using System.Linq; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -10,6 +11,7 @@ using System.Net.Http; using System.Net.Http.Headers; using System.Text; using System.Text.Json; +using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; @@ -33,18 +35,26 @@ namespace Kafka2Doris while (!stoppingToken.IsCancellationRequested) { _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now); - // - var topics = _config.GetSection("kafka").GetValue("topics", "").Split(','); + + var conf = new ConsumerConfig + { + BootstrapServers = _config.GetSection("kafka").GetValue("host", "localhost:9092"), + GroupId = $"doris-v1", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false + }; + var topicsRegex = _config.GetSection("kafka").GetValue("topics.regex", "mysql.example.*"); + var timeout = TimeSpan.FromSeconds(_config.GetSection("kafka").GetValue("timeout", 20)); + + var topics = new List(); + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = conf.BootstrapServers }).Build()) + { + var meta = adminClient.GetMetadata(timeout); + topics = meta.Topics.Where(o => Regex.IsMatch(o.Topic, topicsRegex)).Select(o => o.Topic).ToList(); + } + foreach (var topic in topics) { - var conf = new ConsumerConfig - { - BootstrapServers = _config.GetSection("kafka").GetValue("host", "localhost:9092"), - GroupId = $"doris-v1", - AutoOffsetReset = AutoOffsetReset.Earliest, - EnableAutoCommit = false - }; - var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout", 5)); try { using (var consumer = new ConsumerBuilder(conf).Build()) @@ -62,6 +72,7 @@ namespace Kafka2Doris { var max = _config.GetValue("max", 1000); var list = new List(max); + JsonElement schema = default; while (max > 0) { try @@ -71,7 +82,9 @@ namespace Kafka2Doris { break; } + Debug.WriteLine(consumeResult.Message.Value); var json = JsonDocument.Parse(consumeResult.Message.Value); + schema = json.RootElement.GetProperty("schema"); var after = json.RootElement.GetProperty("payload").GetProperty("after"); if (after.ValueKind == JsonValueKind.Object) { @@ -87,6 +100,7 @@ namespace Kafka2Doris } if (list.Count > 0) { + CreateIfNotExists(topic, schema,_config); var httpClient = this._httpClientFactory.CreateClient(); var username = _config.GetSection("doris").GetValue("username", "root"); var password = _config.GetSection("doris").GetValue("password", "aA123456!"); @@ -133,10 +147,10 @@ namespace Kafka2Doris var responseText = result.Content.ReadAsStringAsync().Result; Debug.WriteLine(responseText); var response = JsonDocument.Parse(responseText); - var label = response.RootElement.GetProperty("Label").GetRawText(); - var message = response.RootElement.GetProperty("Message").GetRawText(); + var label = response.RootElement.GetProperty("Label").GetString(); + var message = response.RootElement.GetProperty("Message").GetString(); this._logger.LogInformation($"{label}:{message}"); - var status = response.RootElement.GetProperty("Status").GetRawText(); + var status = response.RootElement.GetProperty("Status").GetString(); if (status == "Success" || status == "Publish Timeout") { consumer.Commit(); @@ -186,5 +200,59 @@ namespace Kafka2Doris await Task.Delay(this._config.GetValue("delay", 1000 * 60), stoppingToken); } } + + private void CreateIfNotExists(string topic, JsonElement schema,IConfiguration config) + { + var tableName = topic.Replace(".", "_"); + var sql = $"CREATE TABLE IF NOT EXISTS `{tableName}` ("; + sql += "\n"; + var fields = schema.GetProperty("fields")[0].GetProperty("fields"); + var length = fields.GetArrayLength(); + var index = 0; + var buckets = _config.GetSection("doris").GetValue("buckets", 1); + var replication_num = _config.GetSection("doris").GetValue("replication_num", 1); + foreach (var item in fields.EnumerateArray()) + { + var type = item.GetProperty("type").GetString(); + var optional = item.GetProperty("optional").GetBoolean(); + var field = item.GetProperty("field").GetString(); + var name = item.TryGetProperty("name", out var nameElement)?nameElement.GetString():null; + var notNull = !optional; + if (!string.IsNullOrEmpty(name)) + { + if(name== "org.apache.kafka.connect.data.Timestamp") + { + type = "DATETIME"; + } + Debug.WriteLine($"{type}:{name}"); + } + if(type=="string") + { + type = "VARCHAR(255)"; + } + else if(type=="int16") + { + if(notNull) + { + notNull = false; + } + } + if(type.StartsWith("int")) + { + if(int.Parse(type.Substring(3)) <= 32) + { + type = "INT"; + } + else + { + type = "BIGINT"; + } + } + index += 1; + sql += $"`{field}` {type.ToUpper()}{(notNull ? " NOT NULL" : "")} {(index