mysql-kafka-doris测试,保证mysql和kafka数据一致性,保证kafka消费和doris插入一致性

Former-commit-id: 2696e1069107a323606e5f61c8b2648d25052fc4
Former-commit-id: 455b7785ffe86394872d2b470ebf52d34e62a684
TSXN
wanggang 5 years ago
parent 7d5572d932
commit ab97b31e64

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<OutputType>Exe</OutputType> <OutputType>Exe</OutputType>
@ -8,7 +8,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.5.1" /> <PackageReference Include="Confluent.Kafka" Version="1.5.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="3.1.8" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="3.1.8" />
<PackageReference Include="MySql.Data" Version="8.0.21" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" /> <PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
</ItemGroup> </ItemGroup>
</Project> </Project>

@ -1,5 +1,7 @@
using Confluent.Kafka; using Confluent.Kafka;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using MySqlConnector;
using Newtonsoft.Json;
using System; using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -8,8 +10,9 @@ namespace KafkaEFTest
{ {
internal class Program internal class Program
{ {
static TimeSpan DefaultTimeout = TimeSpan.FromSeconds(30); private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(30);
private static void Main(string[] args)
private static void Main()
{ {
using (var db = new TestDbContext()) using (var db = new TestDbContext())
{ {
@ -17,31 +20,37 @@ namespace KafkaEFTest
} }
Task.Run(async () => Task.Run(async () =>
{ {
var topic = "test-topic";
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
TransactionalId = "test",
};
while (true) while (true)
{ {
var entity = new TestEntity(); using (var dbContext = new TestDbContext())
using (var db = new TestDbContext())
{ {
try try
{ {
using (var p = new ProducerBuilder<Null, string>(new ProducerConfig { BootstrapServers = "localhost:9092", TransactionalId = entity.Id.ToString() }).Build()) using (var producer = new ProducerBuilder<string, string>(config).Build())
{ {
try try
{ {
p.InitTransactions(DefaultTimeout); var entity = new TestEntity();
p.BeginTransaction(); producer.InitTransactions(DefaultTimeout);
db.Database.BeginTransaction(); producer.BeginTransaction();
db.Tests.Add(entity); dbContext.Database.BeginTransaction();
db.SaveChanges(); dbContext.Tests.Add(entity);
var dr = await p.ProduceAsync("test-topic", new Message<Null, string> { Value = entity.Id + ":" + entity.Value }); dbContext.SaveChanges();
db.Database.CommitTransaction(); var dr = await producer.ProduceAsync(topic, new Message<string, string> { Key = "test_table", Value = JsonConvert.SerializeObject(entity) });
p.CommitTransaction(DefaultTimeout); dbContext.Database.CommitTransaction();
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'"); producer.CommitTransaction(DefaultTimeout);
Console.WriteLine($"send message offset: '{dr.TopicPartitionOffset}' value: '{dr.Value}");
} }
catch (Exception ex)//DbUpdateException//ProduceException<Null,string> catch (Exception ex)//DbUpdateException//ProduceException<Null,string>
{ {
db.Database.RollbackTransaction(); dbContext.Database.RollbackTransaction();
p.AbortTransaction(DefaultTimeout); producer.AbortTransaction(DefaultTimeout);
Console.WriteLine(ex.Message); Console.WriteLine(ex.Message);
} }
} }
@ -57,13 +66,17 @@ namespace KafkaEFTest
}); });
var conf = new ConsumerConfig var conf = new ConsumerConfig
{ {
GroupId = "test-consumer-group", GroupId = "group_test",
BootstrapServers = "localhost:9092", BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true
}; };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) using (var consumer = new ConsumerBuilder<string, string>(conf).Build())
{ {
c.Subscribe("test-topic"); consumer.Subscribe("test-topic");
CancellationTokenSource cts = new CancellationTokenSource(); CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => Console.CancelKeyPress += (_, e) =>
@ -78,8 +91,26 @@ namespace KafkaEFTest
{ {
try try
{ {
var cr = c.Consume(cts.Token); var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); if (consumeResult.IsPartitionEOF)
{
Console.WriteLine(
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine($"group received message offset: '{consumeResult.TopicPartitionOffset}' key: {consumeResult.Message.Key} value: '{consumeResult.Message.Value}'");
Doris(consumeResult.Value);
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
} }
catch (ConsumeException e) catch (ConsumeException e)
{ {
@ -90,9 +121,50 @@ namespace KafkaEFTest
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
// Ensure the consumer leaves the group cleanly and final offsets are committed. // Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close(); consumer.Close();
}
} }
} }
//create database test;
//CREATE USER 'usr' IDENTIFIED BY 'pwd';
//GRANT ALL ON test TO usr;
//CREATE TABLE IF NOT EXISTS test
//(
// `id` VARCHAR(64),
// `value` VARCHAR(128),
// `number` INT
//)
//UNIQUE KEY(`id`)
//distributed by hash(id) buckets 1
//properties(
// "replication_num" = "1"
//);
private static void Doris(string value)
{
try
{
var entity = JsonConvert.DeserializeObject<TestEntity>(value);
var connectionString = "Server=localhost;Port=9030;Database=test;Uid=usr;Pwd=pwd;";
using (var conn = new MySqlConnection(connectionString))
{
conn.Open();
using (var cmd = conn.CreateCommand())
{
cmd.CommandTimeout = 60;
cmd.CommandText = $"insert into test (`id`,`value`,`number`) values ('{entity.Id}','{entity.Value}',{entity.Number})";
cmd.ExecuteNonQuery();
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.GetType().FullName);
//"Unknown database 'default_cluster:test'"
//errCode = 2, detailMessage = Unknown table 'test'
throw;
}
} }
} }
@ -112,9 +184,11 @@ namespace KafkaEFTest
{ {
this.Id = Guid.NewGuid(); this.Id = Guid.NewGuid();
this.Value = DateTime.Now.Ticks.ToString(); this.Value = DateTime.Now.Ticks.ToString();
this.Number = new Random((int)DateTime.Now.Ticks).Next();
} }
public Guid Id { get; set; } public Guid Id { get; set; }
public string Value { get; set; } public string Value { get; set; }
public int Number { get; set; }
} }
} }

@ -8,27 +8,55 @@ networks:
services: services:
zookeeper: zookeeper:
image: wurstmeister/zookeeper:latest image: wurstmeister/zookeeper:latest
restart: always
ports: ports:
- 2181:2181 - 2181:2181
networks: networks:
default: default:
ipv4_address: 172.172.0.171 ipv4_address: 172.172.0.201
kafka: kafka:
image: wurstmeister/kafka:2.13-2.6.0 image: wurstmeister/kafka:2.13-2.6.0
restart: always restart: always
environment: environment:
KAFKA_ZOOKEEPER_CONNECT: 172.172.0.171:2181 KAFKA_ZOOKEEPER_CONNECT: 172.172.0.201:2181
KAFKA_ADVERTISED_HOST_NAME: 172.172.0.170 KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_HEAP_OPTS: "-Xmx256m -Xms256m"
volumes:
- ./docker/data/kafka:/kafka
ports:
- 9092:9092
networks:
default:
ipv4_address: 172.172.0.210
kafka-manager:
image: sheepkiller/kafka-manager
restart: always
environment:
ZK_HOSTS: 172.172.0.201
KAFKA_BROKERS: 172.172.0.210:9092
ports:
- "9010:9000"
networks:
default:
ipv4_address: 172.172.0.220
doris:
image: doris:0.12.0
restart: always
environment:
TZ: "Asia/Shanghai"
volumes: volumes:
- /etc/localtime:/etc/localtime #- ./conf/fe.conf:/opt/fe/conf/fe.conf
- /var/run/docker.sock:/var/run/docker.sock
- ./log/fe:/opt/fe/log - ./log/fe:/opt/fe/log
- ./data/fe/doris-meta:/opt/fe/doris-meta - ./data/fe/doris-meta:/opt/fe/doris-meta
- ./conf/be.conf:/opt/be/conf/be.conf #- ./conf/be.conf:/opt/be/conf/be.conf
- ./data/be/storage:/opt/be/storage - ./data/be/storage:/opt/be/storage
ports: ports:
- 9092:9092 - 8081:8081
- 8030:8030
- 9030:9030
- 9050:9050
command: bash -c "/opt/fe/bin/start_fe.sh & /opt/be/bin/start_be.sh" command: bash -c "/opt/fe/bin/start_fe.sh & /opt/be/bin/start_be.sh"
#mysql -h 127.0.0.1 -P9030 -u root -e 'ALTER SYSTEM ADD BACKEND "172.172.0.30:9050"'
networks: networks:
default: default:
ipv4_address: 172.172.0.170 ipv4_address: 172.172.0.30

@ -0,0 +1 @@
docker-compose -f docker-compose.yml up --remove-orphans --force-recreate -d

@ -0,0 +1 @@
docker-compose down --remove-orphans

@ -24,6 +24,12 @@ http {
server 172.172.0.12; server 172.172.0.12;
} }
server {
listen 80;
server_name iot.edusoa.com;
return 301 https://$host$request_uri;
}
server { server {
#listen 80; #listen 80;
listen 443; listen 443;

Loading…
Cancel
Save