using Confluent.Kafka; using Microsoft.EntityFrameworkCore; using MySql.Data.MySqlClient; using Newtonsoft.Json; using System; using System.Threading; using System.Threading.Tasks; namespace KafkaEFTest { internal class Program { private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(30); private static void Main() { using (var db = new TestDbContext()) { db.Database.EnsureCreated(); } Task.Run(async () => { var topic = "test-topic"; var config = new ProducerConfig { BootstrapServers = "localhost:9092", TransactionalId = "test", }; while (true) { using (var dbContext = new TestDbContext()) { try { using (var producer = new ProducerBuilder(config).Build()) { try { var entity = new TestEntity(); var entity1 = new TestEntity(); producer.InitTransactions(DefaultTimeout); producer.BeginTransaction(); dbContext.Database.BeginTransaction(); dbContext.Tests.Add(entity); dbContext.Tests.Add(entity1); dbContext.SaveChanges(); var dr = await producer.ProduceAsync(topic, new Message { Key = "test_table", Value = JsonConvert.SerializeObject(entity) }); var dr1 = await producer.ProduceAsync(topic, new Message { Key = "test_table", Value = JsonConvert.SerializeObject(entity1) }); dbContext.Database.CommitTransaction(); producer.CommitTransaction(DefaultTimeout); Console.WriteLine($"send message offset: '{dr.TopicPartitionOffset}' value: '{dr.Value}"); Console.WriteLine($"send message offset: '{dr1.TopicPartitionOffset}' value: '{dr1.Value}"); } catch (Exception ex)//DbUpdateException//ProduceException { dbContext.Database.RollbackTransaction(); producer.AbortTransaction(DefaultTimeout); Console.WriteLine(ex.Message); } } } catch (Exception ex) { Console.WriteLine(ex.Message); } } await Task.Delay(20); } }); var conf = new ConsumerConfig { GroupId = "group_test", BootstrapServers = "localhost:9092", EnableAutoCommit = false, StatisticsIntervalMs = 5000, SessionTimeoutMs = 6000, AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = true }; using (var consumer = new ConsumerBuilder(conf).Build()) { consumer.Subscribe("test-topic"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { while (true) { try { var consumeResult = consumer.Consume(cts.Token); if (consumeResult.IsPartitionEOF) { Console.WriteLine( $"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } Console.WriteLine($"group received message offset: '{consumeResult.TopicPartitionOffset}' key: {consumeResult.Message.Key} value: '{consumeResult.Message.Value}'"); Doris(consumeResult.Value); try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine($"Commit error: {e.Error.Reason}"); } } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. consumer.Close(); } } } //create database test; //CREATE USER 'usr' IDENTIFIED BY 'pwd'; //GRANT ALL ON test TO usr; //use test; //CREATE TABLE IF NOT EXISTS test //( // `id` VARCHAR(64), // `value` VARCHAR(128), // `number` INT //) //UNIQUE KEY(`id`) //distributed by hash(id) buckets 1 //properties( // "replication_num" = "1" //); private static void Doris(string value) { try { var entity = JsonConvert.DeserializeObject(value); var connectionString = "Server=localhost;Port=9030;Database=test;Uid=usr;Pwd=pwd;"; using (var conn = new MySqlConnection(connectionString)) { conn.Open(); using (var cmd = conn.CreateCommand()) { cmd.CommandTimeout = 60; //不需要update,只需要插入,主键相同的自动合并 //可以delete,不建议delete,建议更新标志位 cmd.CommandText = $"insert into test (`id`,`value`,`number`) values ('{entity.Id}','{entity.Value}',{entity.Number})"; cmd.ExecuteNonQuery(); } } } catch (Exception ex) { Console.WriteLine(ex.GetType().FullName); //"Unknown database 'default_cluster:test'" //errCode = 2, detailMessage = Unknown table 'test' throw; } } } public class TestDbContext : DbContext { public DbSet Tests { get; set; } protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { optionsBuilder.UseSqlite("Data Source=data.db"); } } public class TestEntity { public TestEntity() { this.Id = Guid.NewGuid(); this.Value = DateTime.Now.Ticks.ToString(); this.Number = new Random((int)DateTime.Now.Ticks).Next(); } public Guid Id { get; set; } public string Value { get; set; } public int Number { get; set; } } }