Former-commit-id: c9a0b80225dded0911580a27af2aa430c4c35e9c
Former-commit-id: cb56782b60d681fb2a6479c81438ff369e015e77
1.0
wanggang 4 years ago
parent a9db2f01b0
commit 8b7ae5c74d

@ -4,7 +4,10 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@ -35,10 +38,11 @@ namespace Kafka2Doris
var conf = new ConsumerConfig
{
BootstrapServers = _config.GetValue("server", "localhost:9092"),
GroupId = $"kafka2doris",
AutoOffsetReset = AutoOffsetReset.Earliest
GroupId = $"doris-v1",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit=false
};
var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout",30));
var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout",5));
try
{
using (var consumer = new ConsumerBuilder<Ignore, string>(conf).Build())
@ -59,8 +63,17 @@ namespace Kafka2Doris
{
try
{
var consumeResult = consumer.Consume(cts.Token);
list.Add(consumeResult.Message.Value);
var consumeResult = consumer.Consume(timeout);
if(consumeResult == null)
{
break;
}
var json = JsonDocument.Parse(consumeResult.Message.Value);
var after = json.RootElement.GetProperty("payload").GetProperty("after");
if (after.ValueKind == JsonValueKind.Object)
{
list.Add(after.GetRawText());
}
}
catch (ConsumeException e)
{
@ -72,6 +85,24 @@ namespace Kafka2Doris
if(list.Count>0)
{
var httpClient = this._httpClientFactory.CreateClient();
httpClient.DefaultRequestHeaders.Add("label", DateTime.Now.ToString());
var url = $"{_config.GetValue("doris","http://localhost:8030")}/api/example/User/_stream_load";
using (var multiContent = new MultipartFormDataContent())
{
using (var ms = new MemoryStream())
{
using(var sw = new StreamWriter(ms))
{
sw.WriteLine("");
//{ "Id":"49fa11e7-4404-4fe5-9873-e89a01d8e529","UserName":"super1","SecurityStamp":"123456","PasswordHash":"579f889441b4a55d667233941d72a83ed644f7e5","PasswordConfirmed":1,"Email":"super@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"超级管理员","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"1e59e461-af12-446f-b876-eac8f58d3c79","Created":1618377036611675,"Modified":null,"Deleted":null}
var data = ms.ToArray();
var fileContent = new ByteArrayContent(data);
multiContent.Add(fileContent, "file", "input.csv");
var result = httpClient.PostAsync(url, multiContent).Result;//401 http basic
}
}
}
//httpClient.PutAsync
//consumer.Commit();
}

@ -5,6 +5,6 @@
"Microsoft": "Warning"
}
},
"kafka": "localhost:9092",
"kafka": "192.168.100.144:9092",
"topics": "mysql.example.User"
}
Loading…
Cancel
Save