using Confluent.Kafka; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Net.Http; using System.Net.Http.Headers; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; namespace Kafka2Doris { public class Worker : BackgroundService { private readonly ILogger _logger; private readonly IConfiguration _config; private readonly IHttpClientFactory _httpClientFactory; public Worker(ILogger logger, IConfiguration config, IHttpClientFactory httpClientFactory) { this._logger = logger; this._config = config; this._httpClientFactory = httpClientFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now); // var topics = _config.GetSection("kafka").GetValue("topics", "").Split(','); foreach (var topic in topics) { var conf = new ConsumerConfig { BootstrapServers = _config.GetSection("kafka").GetValue("host", "localhost:9092"), GroupId = $"doris-v1", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false }; var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout", 5)); try { using (var consumer = new ConsumerBuilder(conf).Build()) { try { consumer.Subscribe(topic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { var max = _config.GetValue("max", 1000); var list = new List(max); while (max > 0) { try { var consumeResult = consumer.Consume(timeout); if (consumeResult == null) { break; } var json = JsonDocument.Parse(consumeResult.Message.Value); var after = json.RootElement.GetProperty("payload").GetProperty("after"); if (after.ValueKind == JsonValueKind.Object) { list.Add(after.GetRawText()); } } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); break; } max -= 1; } if (list.Count > 0) { var httpClient = this._httpClientFactory.CreateClient(); var username = _config.GetSection("doris").GetValue("username", "root"); var password = _config.GetSection("doris").GetValue("password", "aA123456!"); httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"))); httpClient.DefaultRequestHeaders.Add("Expect", "100-continue"); httpClient.DefaultRequestHeaders.Add("strip_outer_array", "true"); httpClient.DefaultRequestHeaders.Add("strip_outer_array", "true"); httpClient.DefaultRequestHeaders.Add("columns", "UserName,SecurityStamp,PasswordHash,PasswordConfirmed,Email,EmailConfirmed,PhoneNumber,PhoneNumberConfirmed,RealName,IdentityNumber,IdentityConfirmed,NickName,Avatar,Sex,Birthday,LockoutEnabled,AccessFailedCount,LockoutEnd,RowVersion,Created,Modified,Deleted"); httpClient.DefaultRequestHeaders.Add("jsonpaths", "\"$.UserName\", \"$.SecurityStamp\", \"$.PasswordHash`\", \"$.PasswordConfirmed\", \"$.Email\", \"$.EmailConfirmed\", \"$.PhoneNumber\", \"$.PhoneNumberConfirmed\", \"$.RealName\", \"$.IdentityNumber\", \"$.IdentityConfirmed\", \"$.NickName\", \"$.Avatar\", \"$.Sex\", \"$.Birthday\", \"$.LockoutEnabled\", \"$.AccessFailedCount\", \"$.LockoutEnd\", \"$.RowVersion\", \"$.Created\", \"$.Modified\", \"$.Deleted\""); httpClient.DefaultRequestHeaders.Add("label", DateTime.Now.ToString()); var server = _config.GetSection("doris").GetValue("server", "http://localhost:8030"); var database = _config.GetSection("doris").GetValue("database", "example"); var url = $"{server}/api/{database}/User/_stream_load"; using (var multiContent = new MultipartFormDataContent()) { using (var ms = new MemoryStream()) { using (var sw = new StreamWriter(ms)) { sw.WriteLine("["); for (int i = 0; i < list.Count; i++) { var item = list[i]; sw.Write($"{item}"); if (i < list.Count - 1) { sw.WriteLine(','); } else { sw.WriteLine(); } } sw.WriteLine("]"); sw.Flush(); var data = ms.ToArray(); Debug.WriteLine(Encoding.UTF8.GetString(data)); var fileContent = new ByteArrayContent(data); multiContent.Add(fileContent, "file", "example.json"); var result = httpClient.PutAsync(url, multiContent).Result; if (result.StatusCode == System.Net.HttpStatusCode.OK) { var responseText = result.Content.ReadAsStringAsync().Result; Debug.WriteLine(responseText); var response = JsonDocument.Parse(responseText); var label = response.RootElement.GetProperty("Label").GetRawText(); var message = response.RootElement.GetProperty("Message").GetRawText(); this._logger.LogInformation($"{label}:{message}"); var status = response.RootElement.GetProperty("Status").GetRawText(); if (status == "Success" || status == "Publish Timeout") { consumer.Commit(); } } //{ // "TxnId": 3009, // "Label": "c1f15903-805d-4d8a-8533-4c96080c49fe", // "Status": "Success", // "Message": "OK", // "NumberTotalRows": 3, // "NumberLoadedRows": 3, // "NumberFilteredRows": 0, // "NumberUnselectedRows": 0, // "LoadBytes": 1678, // "LoadTimeMs": 689, // "BeginTxnTimeMs": 1, // "StreamLoadPutTimeMs": 4, // "ReadDataTimeMs": 0, // "WriteDataTimeMs": 257, // "CommitAndPublishTimeMs": 421 //} } } } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. consumer.Close(); } } catch (Exception ex)//DbUpdateException//ProduceException { Console.WriteLine(ex.Message); } } } catch (Exception ex) { this._logger.LogError(ex.ToString()); } } // await Task.Delay(this._config.GetValue("delay", 1000 * 60), stoppingToken); } } } }