You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iot/labs/kafka/KafkaEFTest/Program.cs

199 lines
7.7 KiB

using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using MySql.Data.MySqlClient;
using Newtonsoft.Json;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaEFTest
{
internal class Program
{
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(30);
private static void Main()
{
using (var db = new TestDbContext())
{
db.Database.EnsureCreated();
}
Task.Run(async () =>
{
var topic = "test-topic";
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
TransactionalId = "test",
};
while (true)
{
using (var dbContext = new TestDbContext())
{
try
{
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
try
{
var entity = new TestEntity();
var entity1 = new TestEntity();
producer.InitTransactions(DefaultTimeout);
producer.BeginTransaction();
dbContext.Database.BeginTransaction();
dbContext.Tests.Add(entity);
dbContext.Tests.Add(entity1);
dbContext.SaveChanges();
var dr = await producer.ProduceAsync(topic, new Message<string, string> { Key = "test_table", Value = JsonConvert.SerializeObject(entity) });
var dr1 = await producer.ProduceAsync(topic, new Message<string, string> { Key = "test_table", Value = JsonConvert.SerializeObject(entity1) });
dbContext.Database.CommitTransaction();
producer.CommitTransaction(DefaultTimeout);
Console.WriteLine($"send message offset: '{dr.TopicPartitionOffset}' value: '{dr.Value}");
Console.WriteLine($"send message offset: '{dr1.TopicPartitionOffset}' value: '{dr1.Value}");
}
catch (Exception ex)//DbUpdateException//ProduceException<Null,string>
{
dbContext.Database.RollbackTransaction();
producer.AbortTransaction(DefaultTimeout);
Console.WriteLine(ex.Message);
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
await Task.Delay(20);
}
});
var conf = new ConsumerConfig
{
GroupId = "group_test",
BootstrapServers = "localhost:9092",
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
using (var consumer = new ConsumerBuilder<string, string>(conf).Build())
{
consumer.Subscribe("test-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine($"group received message offset: '{consumeResult.TopicPartitionOffset}' key: {consumeResult.Message.Key} value: '{consumeResult.Message.Value}'");
Doris(consumeResult.Value);
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
}
//create database test;
//CREATE USER 'usr' IDENTIFIED BY 'pwd';
//GRANT ALL ON test TO usr;
//use test;
//CREATE TABLE IF NOT EXISTS test
//(
// `id` VARCHAR(64),
// `value` VARCHAR(128),
// `number` INT
//)
//UNIQUE KEY(`id`)
//distributed by hash(id) buckets 1
//properties(
// "replication_num" = "1"
//);
private static void Doris(string value)
{
try
{
var entity = JsonConvert.DeserializeObject<TestEntity>(value);
var connectionString = "Server=localhost;Port=9030;Database=test;Uid=usr;Pwd=pwd;";
using (var conn = new MySqlConnection(connectionString))
{
conn.Open();
using (var cmd = conn.CreateCommand())
{
cmd.CommandTimeout = 60;
cmd.CommandText = $"insert into test (`id`,`value`,`number`) values ('{entity.Id}','{entity.Value}',{entity.Number})";
cmd.ExecuteNonQuery();
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.GetType().FullName);
//"Unknown database 'default_cluster:test'"
//errCode = 2, detailMessage = Unknown table 'test'
throw;
}
}
}
public class TestDbContext : DbContext
{
public DbSet<TestEntity> Tests { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlite("Data Source=data.db");
}
}
public class TestEntity
{
public TestEntity()
{
this.Id = Guid.NewGuid();
this.Value = DateTime.Now.Ticks.ToString();
this.Number = new Random((int)DateTime.Now.Ticks).Next();
}
public Guid Id { get; set; }
public string Value { get; set; }
public int Number { get; set; }
}
}