From a9db2f01b0c19202b83aa1bd10999d96b294d6f3 Mon Sep 17 00:00:00 2001 From: wanggang <76527413@qq.com> Date: Sat, 8 May 2021 16:09:48 +0800 Subject: [PATCH] update Former-commit-id: 6eedbeba4d2534f6bf7d217b5cd226fcbaea3839 Former-commit-id: 5e6031bb8b5f96a8221c754050245602f5d30d34 --- labs/doris/.env | 1 + labs/doris/Kafka2Doris/Kafka2Doris.csproj | 7 +++ labs/doris/Kafka2Doris/Program.cs | 27 +++++--- labs/doris/Kafka2Doris/Worker.cs | 77 ++++++++++++++++++++++- labs/doris/Kafka2Doris/appsettings.json | 12 +++- labs/doris/docker-compose.yml | 4 +- 6 files changed, 112 insertions(+), 16 deletions(-) diff --git a/labs/doris/.env b/labs/doris/.env index e69de29b..fb5c742f 100644 --- a/labs/doris/.env +++ b/labs/doris/.env @@ -0,0 +1 @@ +export IP=192.168.100.144 \ No newline at end of file diff --git a/labs/doris/Kafka2Doris/Kafka2Doris.csproj b/labs/doris/Kafka2Doris/Kafka2Doris.csproj index 35f2fa42..d2ca6798 100644 --- a/labs/doris/Kafka2Doris/Kafka2Doris.csproj +++ b/labs/doris/Kafka2Doris/Kafka2Doris.csproj @@ -8,6 +8,13 @@ + + + + + + PreserveNewest + diff --git a/labs/doris/Kafka2Doris/Program.cs b/labs/doris/Kafka2Doris/Program.cs index 83813438..e84e4eb5 100644 --- a/labs/doris/Kafka2Doris/Program.cs +++ b/labs/doris/Kafka2Doris/Program.cs @@ -2,26 +2,39 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using System; +using System.IO; +using System.Reflection; +using System.Runtime.InteropServices; namespace Kafka2Doris { - internal class Program + public class Program { private static void Main(string[] args) { + var isWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows); + var rid = isWindows ? "win-x64" : "linux-x64"; + var file = isWindows ? "librdkafka.dll" : "librdkafka.so"; + var path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "runtimes",rid, "native", file); + Confluent.Kafka.Library.Load(path); + + var config = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json") + .AddEnvironmentVariables() + .AddCommandLine(args) + .Build(); Host.CreateDefaultBuilder(args) .ConfigureLogging(o => { o.AddConsole(); }) - .ConfigureAppConfiguration((hostingContext, configuration) => - { - configuration - .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true) - .AddJsonFile($"appsettings.{hostingContext.HostingEnvironment.EnvironmentName}.json", true, true); - }) .ConfigureServices((hostingContext, services) => { + services.AddLogging(); + services.AddSingleton(config); + services.AddHttpClient(); services.AddHostedService(); }) .Build() diff --git a/labs/doris/Kafka2Doris/Worker.cs b/labs/doris/Kafka2Doris/Worker.cs index 579e20ad..425e246a 100644 --- a/labs/doris/Kafka2Doris/Worker.cs +++ b/labs/doris/Kafka2Doris/Worker.cs @@ -1,7 +1,10 @@ -using Microsoft.Extensions.Configuration; +using Confluent.Kafka; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; +using System.Collections.Generic; +using System.Net.Http; using System.Threading; using System.Threading.Tasks; @@ -11,11 +14,13 @@ namespace Kafka2Doris { private readonly ILogger _logger; private readonly IConfiguration _config; + private readonly IHttpClientFactory _httpClientFactory; - public Worker(ILogger logger, IConfiguration config) + public Worker(ILogger logger, IConfiguration config,IHttpClientFactory httpClientFactory) { this._logger = logger; this._config = config; + this._httpClientFactory = httpClientFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -23,7 +28,73 @@ namespace Kafka2Doris while (!stoppingToken.IsCancellationRequested) { _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now); - //读取kafka + // + var topics = _config["topics"].Split(','); + foreach (var topic in topics) + { + var conf = new ConsumerConfig + { + BootstrapServers = _config.GetValue("server", "localhost:9092"), + GroupId = $"kafka2doris", + AutoOffsetReset = AutoOffsetReset.Earliest + }; + var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout",30)); + try + { + using (var consumer = new ConsumerBuilder(conf).Build()) + { + try + { + consumer.Subscribe(topic); + CancellationTokenSource cts = new CancellationTokenSource(); + Console.CancelKeyPress += (_, e) => { + e.Cancel = true; // prevent the process from terminating. + cts.Cancel(); + }; + try + { + var max = _config.GetValue("max", 1000); + var list = new List(max); + while (max>0) + { + try + { + var consumeResult = consumer.Consume(cts.Token); + list.Add(consumeResult.Message.Value); + } + catch (ConsumeException e) + { + Console.WriteLine($"Error occured: {e.Error.Reason}"); + break; + } + max -= 1; + } + if(list.Count>0) + { + var httpClient = this._httpClientFactory.CreateClient(); + //httpClient.PutAsync + //consumer.Commit(); + } + } + catch (OperationCanceledException) + { + // Ensure the consumer leaves the group cleanly and final offsets are committed. + consumer.Close(); + } + } + catch (Exception ex)//DbUpdateException//ProduceException + { + Console.WriteLine(ex.Message); + } + } + } + catch (Exception ex) + { + this._logger.LogError(ex.ToString()); + } + } + + // await Task.Delay(this._config.GetValue("delay", 1000 * 60), stoppingToken); } } diff --git a/labs/doris/Kafka2Doris/appsettings.json b/labs/doris/Kafka2Doris/appsettings.json index e07b357d..33903040 100644 --- a/labs/doris/Kafka2Doris/appsettings.json +++ b/labs/doris/Kafka2Doris/appsettings.json @@ -1,4 +1,10 @@ { - "delay": 60, - "group": 1000 -} + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft": "Warning" + } + }, + "kafka": "localhost:9092", + "topics": "mysql.example.User" +} \ No newline at end of file diff --git a/labs/doris/docker-compose.yml b/labs/doris/docker-compose.yml index c8c2c5d6..bf5d8921 100644 --- a/labs/doris/docker-compose.yml +++ b/labs/doris/docker-compose.yml @@ -31,7 +31,7 @@ services: environment: KAFKA_HEAP_OPTS: "-Xmx512m -Xms512m" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_HOST_NAME: 172.172.0.21 + KAFKA_ADVERTISED_HOST_NAME: ${ip} num.partitions: 1 volumes: - ./data/kafka:/kafka @@ -116,7 +116,6 @@ services: environment: - priority_networks=172.172.0.0/24 volumes: - - ./apps/doris:/doris - ./conf/doris/fe.conf:/doris/fe/conf/fe.conf - ./log/doris/fe:/doris/fe/log - ./data/doris/fe/doris-meta:/doris/fe/doris-meta @@ -134,7 +133,6 @@ services: environment: - priority_networks=172.172.0.0/24 volumes: - - ./apps/doris:/doris - ./conf/doris/be.conf:/doris/be/conf/be.conf - ./data/doris/be/storage:/doris/be/storage - ./log/doris/be/:/doris/be/log