diff --git a/labs/doris/Kafka2Doris/Worker.cs b/labs/doris/Kafka2Doris/Worker.cs index 425e246a..ee2898d2 100644 --- a/labs/doris/Kafka2Doris/Worker.cs +++ b/labs/doris/Kafka2Doris/Worker.cs @@ -4,7 +4,10 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.IO; +using System.Linq; using System.Net.Http; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -35,10 +38,11 @@ namespace Kafka2Doris var conf = new ConsumerConfig { BootstrapServers = _config.GetValue("server", "localhost:9092"), - GroupId = $"kafka2doris", - AutoOffsetReset = AutoOffsetReset.Earliest + GroupId = $"doris-v1", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit=false }; - var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout",30)); + var timeout = TimeSpan.FromSeconds(_config.GetValue("timeout",5)); try { using (var consumer = new ConsumerBuilder(conf).Build()) @@ -59,8 +63,17 @@ namespace Kafka2Doris { try { - var consumeResult = consumer.Consume(cts.Token); - list.Add(consumeResult.Message.Value); + var consumeResult = consumer.Consume(timeout); + if(consumeResult == null) + { + break; + } + var json = JsonDocument.Parse(consumeResult.Message.Value); + var after = json.RootElement.GetProperty("payload").GetProperty("after"); + if (after.ValueKind == JsonValueKind.Object) + { + list.Add(after.GetRawText()); + } } catch (ConsumeException e) { @@ -72,6 +85,24 @@ namespace Kafka2Doris if(list.Count>0) { var httpClient = this._httpClientFactory.CreateClient(); + httpClient.DefaultRequestHeaders.Add("label", DateTime.Now.ToString()); + var url = $"{_config.GetValue("doris","http://localhost:8030")}/api/example/User/_stream_load"; + using (var multiContent = new MultipartFormDataContent()) + { + using (var ms = new MemoryStream()) + { + using(var sw = new StreamWriter(ms)) + { + sw.WriteLine(""); + //{ "Id":"49fa11e7-4404-4fe5-9873-e89a01d8e529","UserName":"super1","SecurityStamp":"123456","PasswordHash":"579f889441b4a55d667233941d72a83ed644f7e5","PasswordConfirmed":1,"Email":"super@test.com","EmailConfirmed":1,"PhoneNumber":null,"PhoneNumberConfirmed":0,"RealName":null,"IdentityNumber":null,"IdentityConfirmed":0,"NickName":"超级管理员","Avatar":null,"Sex":null,"Birthday":null,"LockoutEnabled":0,"AccessFailedCount":0,"LockoutEnd":null,"RowVersion":"1e59e461-af12-446f-b876-eac8f58d3c79","Created":1618377036611675,"Modified":null,"Deleted":null} + var data = ms.ToArray(); + var fileContent = new ByteArrayContent(data); + multiContent.Add(fileContent, "file", "input.csv"); + var result = httpClient.PostAsync(url, multiContent).Result;//401 http basic + } + } + + } //httpClient.PutAsync //consumer.Commit(); } diff --git a/labs/doris/Kafka2Doris/appsettings.json b/labs/doris/Kafka2Doris/appsettings.json index 33903040..0f8512d7 100644 --- a/labs/doris/Kafka2Doris/appsettings.json +++ b/labs/doris/Kafka2Doris/appsettings.json @@ -5,6 +5,6 @@ "Microsoft": "Warning" } }, - "kafka": "localhost:9092", + "kafka": "192.168.100.144:9092", "topics": "mysql.example.User" } \ No newline at end of file