You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iot/labs/kafka/KafkaEFTest/Program.cs

201 lines
7.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using MySql.Data.MySqlClient;
using Newtonsoft.Json;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaEFTest
{
internal class Program
{
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(30);
private static void Main()
{
using (var db = new TestDbContext())
{
db.Database.EnsureCreated();
}
Task.Run(async () =>
{
var topic = "test-topic";
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
TransactionalId = "test",
};
while (true)
{
using (var dbContext = new TestDbContext())
{
try
{
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
try
{
var entity = new TestEntity();
var entity1 = new TestEntity();
producer.InitTransactions(DefaultTimeout);
producer.BeginTransaction();
dbContext.Database.BeginTransaction();
dbContext.Tests.Add(entity);
dbContext.Tests.Add(entity1);
dbContext.SaveChanges();
var dr = await producer.ProduceAsync(topic, new Message<string, string> { Key = "test_table", Value = JsonConvert.SerializeObject(entity) });
var dr1 = await producer.ProduceAsync(topic, new Message<string, string> { Key = "test_table", Value = JsonConvert.SerializeObject(entity1) });
dbContext.Database.CommitTransaction();
producer.CommitTransaction(DefaultTimeout);
Console.WriteLine($"send message offset: '{dr.TopicPartitionOffset}' value: '{dr.Value}");
Console.WriteLine($"send message offset: '{dr1.TopicPartitionOffset}' value: '{dr1.Value}");
}
catch (Exception ex)//DbUpdateException//ProduceException<Null,string>
{
dbContext.Database.RollbackTransaction();
producer.AbortTransaction(DefaultTimeout);
Console.WriteLine(ex.Message);
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
await Task.Delay(20);
}
});
var conf = new ConsumerConfig
{
GroupId = "group_test",
BootstrapServers = "localhost:9092",
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
};
using (var consumer = new ConsumerBuilder<string, string>(conf).Build())
{
consumer.Subscribe("test-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine($"group received message offset: '{consumeResult.TopicPartitionOffset}' key: {consumeResult.Message.Key} value: '{consumeResult.Message.Value}'");
Doris(consumeResult.Value);
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
}
//create database test;
//CREATE USER 'usr' IDENTIFIED BY 'pwd';
//GRANT ALL ON test TO usr;
//use test;
//CREATE TABLE IF NOT EXISTS test
//(
// `id` VARCHAR(64),
// `value` VARCHAR(128),
// `number` INT
//)
//UNIQUE KEY(`id`)
//distributed by hash(id) buckets 1
//properties(
// "replication_num" = "1"
//);
private static void Doris(string value)
{
try
{
var entity = JsonConvert.DeserializeObject<TestEntity>(value);
var connectionString = "Server=localhost;Port=9030;Database=test;Uid=usr;Pwd=pwd;";
using (var conn = new MySqlConnection(connectionString))
{
conn.Open();
using (var cmd = conn.CreateCommand())
{
cmd.CommandTimeout = 60;
//不需要update只需要插入主键相同的自动合并
//可以delete不建议delete建议更新标志位
cmd.CommandText = $"insert into test (`id`,`value`,`number`) values ('{entity.Id}','{entity.Value}',{entity.Number})";
cmd.ExecuteNonQuery();
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.GetType().FullName);
//"Unknown database 'default_cluster:test'"
//errCode = 2, detailMessage = Unknown table 'test'
throw;
}
}
}
public class TestDbContext : DbContext
{
public DbSet<TestEntity> Tests { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlite("Data Source=data.db");
}
}
public class TestEntity
{
public TestEntity()
{
this.Id = Guid.NewGuid();
this.Value = DateTime.Now.Ticks.ToString();
this.Number = new Random((int)DateTime.Now.Ticks).Next();
}
public Guid Id { get; set; }
public string Value { get; set; }
public int Number { get; set; }
}
}