You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iot/labs/doris/Kafka2Doris/Worker.cs

138 lines
7.4 KiB

using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace Kafka2Doris
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IConfiguration _config;
private readonly IHttpClientFactory _httpClientFactory;
public Worker(ILogger<Worker> logger, IConfiguration config,IHttpClientFactory httpClientFactory)
{
this._logger = logger;
this._config = config;
this._httpClientFactory = httpClientFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
//
var topics = _config["topics"].Split(',');
foreach (var topic in topics)
{
var conf = new ConsumerConfig
{
BootstrapServers = _config.GetValue("server", "localhost:9092"),
GroupId = $"doris-v1",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit=false
};
var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout",5));
try
{
using (var consumer = new ConsumerBuilder<Ignore, string>(conf).Build())
{
try
{
consumer.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
var max = _config.GetValue("max", 1000);
var list = new List<string>(max);
while (max>0)
{
try
{
var consumeResult = consumer.Consume(timeout);
if(consumeResult == null)
{
break;
}
var json = JsonDocument.Parse(consumeResult.Message.Value);
var after = json.RootElement.GetProperty("payload").GetProperty("after");
if (after.ValueKind == JsonValueKind.Object)
{
list.Add(after.GetRawText());
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
break;
}
max -= 1;
}
if(list.Count>0)
{
var httpClient = this._httpClientFactory.CreateClient();
var username = _config.GetValue("username", "root");
var password = _config.GetValue("password", "aA123456!");
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}")));
httpClient.DefaultRequestHeaders.Add("label", DateTime.Now.ToString());
var url = $"{_config.GetValue("doris","http://localhost:8030")}/api/example/User/_stream_load";
using (var multiContent = new MultipartFormDataContent())
{
using (var ms = new MemoryStream())
{
using(var sw = new StreamWriter(ms))
{
sw.WriteLine("");
//{ "Id":"49fa11e7-4404-4fe5-9873-e89a01d8e529","UserName":"super1","SecurityStamp":"123456","PasswordHash":"579f889441b4a55d667233941d72a83ed644f7e5","PasswordConfirmed":1,"Email":"super@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"超级管理员","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"1e59e461-af12-446f-b876-eac8f58d3c79","Created":1618377036611675,"Modified":null,"Deleted":null}
var data = ms.ToArray();
var fileContent = new ByteArrayContent(data);
multiContent.Add(fileContent, "file", "input.csv");
var result = httpClient.PostAsync(url, multiContent).Result;//401 http basic
}
}
}
//httpClient.PutAsync
//consumer.Commit();
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
catch (Exception ex)//DbUpdateException//ProduceException<Null,string>
{
Console.WriteLine(ex.Message);
}
}
}
catch (Exception ex)
{
this._logger.LogError(ex.ToString());
}
}
//
await Task.Delay(this._config.GetValue("delay", 1000 * 60), stoppingToken);
}
}
}
}