using Confluent.Kafka; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Net.Http; using System.Threading; using System.Threading.Tasks; namespace Kafka2Doris { public class Worker : BackgroundService { private readonly ILogger _logger; private readonly IConfiguration _config; private readonly IHttpClientFactory _httpClientFactory; public Worker(ILogger logger, IConfiguration config,IHttpClientFactory httpClientFactory) { this._logger = logger; this._config = config; this._httpClientFactory = httpClientFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now); // var topics = _config["topics"].Split(','); foreach (var topic in topics) { var conf = new ConsumerConfig { BootstrapServers = _config.GetValue("server", "localhost:9092"), GroupId = $"kafka2doris", AutoOffsetReset = AutoOffsetReset.Earliest }; var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout",30)); try { using (var consumer = new ConsumerBuilder(conf).Build()) { try { consumer.Subscribe(topic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { var max = _config.GetValue("max", 1000); var list = new List(max); while (max>0) { try { var consumeResult = consumer.Consume(cts.Token); list.Add(consumeResult.Message.Value); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); break; } max -= 1; } if(list.Count>0) { var httpClient = this._httpClientFactory.CreateClient(); //httpClient.PutAsync //consumer.Commit(); } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. consumer.Close(); } } catch (Exception ex)//DbUpdateException//ProduceException { Console.WriteLine(ex.Message); } } } catch (Exception ex) { this._logger.LogError(ex.ToString()); } } // await Task.Delay(this._config.GetValue("delay", 1000 * 60), stoppingToken); } } } }