diff --git a/labs/kafka/KafkaEFTest/KafkaEFTest.csproj b/labs/kafka/KafkaEFTest/KafkaEFTest.csproj index 730ddf42..c4370a67 100644 --- a/labs/kafka/KafkaEFTest/KafkaEFTest.csproj +++ b/labs/kafka/KafkaEFTest/KafkaEFTest.csproj @@ -1,4 +1,4 @@ - + Exe @@ -8,7 +8,7 @@ + - - + \ No newline at end of file diff --git a/labs/kafka/KafkaEFTest/Program.cs b/labs/kafka/KafkaEFTest/Program.cs index 00038372..d2821e20 100644 --- a/labs/kafka/KafkaEFTest/Program.cs +++ b/labs/kafka/KafkaEFTest/Program.cs @@ -1,5 +1,7 @@ using Confluent.Kafka; using Microsoft.EntityFrameworkCore; +using MySqlConnector; +using Newtonsoft.Json; using System; using System.Threading; using System.Threading.Tasks; @@ -8,8 +10,9 @@ namespace KafkaEFTest { internal class Program { - static TimeSpan DefaultTimeout = TimeSpan.FromSeconds(30); - private static void Main(string[] args) + private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(30); + + private static void Main() { using (var db = new TestDbContext()) { @@ -17,31 +20,37 @@ namespace KafkaEFTest } Task.Run(async () => { + var topic = "test-topic"; + var config = new ProducerConfig + { + BootstrapServers = "localhost:9092", + TransactionalId = "test", + }; while (true) { - var entity = new TestEntity(); - using (var db = new TestDbContext()) + using (var dbContext = new TestDbContext()) { try { - using (var p = new ProducerBuilder(new ProducerConfig { BootstrapServers = "localhost:9092", TransactionalId = entity.Id.ToString() }).Build()) + using (var producer = new ProducerBuilder(config).Build()) { try { - p.InitTransactions(DefaultTimeout); - p.BeginTransaction(); - db.Database.BeginTransaction(); - db.Tests.Add(entity); - db.SaveChanges(); - var dr = await p.ProduceAsync("test-topic", new Message { Value = entity.Id + ":" + entity.Value }); - db.Database.CommitTransaction(); - p.CommitTransaction(DefaultTimeout); - Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'"); + var entity = new TestEntity(); + producer.InitTransactions(DefaultTimeout); + producer.BeginTransaction(); + dbContext.Database.BeginTransaction(); + dbContext.Tests.Add(entity); + dbContext.SaveChanges(); + var dr = await producer.ProduceAsync(topic, new Message { Key = "test_table", Value = JsonConvert.SerializeObject(entity) }); + dbContext.Database.CommitTransaction(); + producer.CommitTransaction(DefaultTimeout); + Console.WriteLine($"send message offset: '{dr.TopicPartitionOffset}' value: '{dr.Value}"); } catch (Exception ex)//DbUpdateException//ProduceException { - db.Database.RollbackTransaction(); - p.AbortTransaction(DefaultTimeout); + dbContext.Database.RollbackTransaction(); + producer.AbortTransaction(DefaultTimeout); Console.WriteLine(ex.Message); } } @@ -57,13 +66,17 @@ namespace KafkaEFTest }); var conf = new ConsumerConfig { - GroupId = "test-consumer-group", + GroupId = "group_test", BootstrapServers = "localhost:9092", - AutoOffsetReset = AutoOffsetReset.Earliest + EnableAutoCommit = false, + StatisticsIntervalMs = 5000, + SessionTimeoutMs = 6000, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true }; - using (var c = new ConsumerBuilder(conf).Build()) + using (var consumer = new ConsumerBuilder(conf).Build()) { - c.Subscribe("test-topic"); + consumer.Subscribe("test-topic"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => @@ -78,8 +91,26 @@ namespace KafkaEFTest { try { - var cr = c.Consume(cts.Token); - Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); + var consumeResult = consumer.Consume(cts.Token); + if (consumeResult.IsPartitionEOF) + { + Console.WriteLine( + $"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); + continue; + } + + Console.WriteLine($"group received message offset: '{consumeResult.TopicPartitionOffset}' key: {consumeResult.Message.Key} value: '{consumeResult.Message.Value}'"); + + Doris(consumeResult.Value); + + try + { + consumer.Commit(consumeResult); + } + catch (KafkaException e) + { + Console.WriteLine($"Commit error: {e.Error.Reason}"); + } } catch (ConsumeException e) { @@ -90,10 +121,51 @@ namespace KafkaEFTest catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. - c.Close(); + consumer.Close(); } } } + + //create database test; + //CREATE USER 'usr' IDENTIFIED BY 'pwd'; + //GRANT ALL ON test TO usr; + //CREATE TABLE IF NOT EXISTS test + //( + // `id` VARCHAR(64), + // `value` VARCHAR(128), + // `number` INT + //) + //UNIQUE KEY(`id`) + //distributed by hash(id) buckets 1 + //properties( + // "replication_num" = "1" + //); + + private static void Doris(string value) + { + try + { + var entity = JsonConvert.DeserializeObject(value); + var connectionString = "Server=localhost;Port=9030;Database=test;Uid=usr;Pwd=pwd;"; + using (var conn = new MySqlConnection(connectionString)) + { + conn.Open(); + using (var cmd = conn.CreateCommand()) + { + cmd.CommandTimeout = 60; + cmd.CommandText = $"insert into test (`id`,`value`,`number`) values ('{entity.Id}','{entity.Value}',{entity.Number})"; + cmd.ExecuteNonQuery(); + } + } + } + catch (Exception ex) + { + Console.WriteLine(ex.GetType().FullName); + //"Unknown database 'default_cluster:test'" + //errCode = 2, detailMessage = Unknown table 'test' + throw; + } + } } public class TestDbContext : DbContext @@ -112,9 +184,11 @@ namespace KafkaEFTest { this.Id = Guid.NewGuid(); this.Value = DateTime.Now.Ticks.ToString(); + this.Number = new Random((int)DateTime.Now.Ticks).Next(); } public Guid Id { get; set; } public string Value { get; set; } + public int Number { get; set; } } } \ No newline at end of file diff --git a/labs/kafka/docker-compose.yml b/labs/kafka/docker-compose.yml index c8705ceb..fdf241a4 100644 --- a/labs/kafka/docker-compose.yml +++ b/labs/kafka/docker-compose.yml @@ -8,27 +8,55 @@ networks: services: zookeeper: image: wurstmeister/zookeeper:latest + restart: always ports: - 2181:2181 networks: default: - ipv4_address: 172.172.0.171 + ipv4_address: 172.172.0.201 kafka: image: wurstmeister/kafka:2.13-2.6.0 restart: always environment: - KAFKA_ZOOKEEPER_CONNECT: 172.172.0.171:2181 - KAFKA_ADVERTISED_HOST_NAME: 172.172.0.170 + KAFKA_ZOOKEEPER_CONNECT: 172.172.0.201:2181 + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_HEAP_OPTS: "-Xmx256m -Xms256m" + volumes: + - ./docker/data/kafka:/kafka + ports: + - 9092:9092 + networks: + default: + ipv4_address: 172.172.0.210 + kafka-manager: + image: sheepkiller/kafka-manager + restart: always + environment: + ZK_HOSTS: 172.172.0.201 + KAFKA_BROKERS: 172.172.0.210:9092 + ports: + - "9010:9000" + networks: + default: + ipv4_address: 172.172.0.220 + doris: + image: doris:0.12.0 + restart: always + environment: + TZ: "Asia/Shanghai" volumes: - - /etc/localtime:/etc/localtime - - /var/run/docker.sock:/var/run/docker.sock + #- ./conf/fe.conf:/opt/fe/conf/fe.conf - ./log/fe:/opt/fe/log - ./data/fe/doris-meta:/opt/fe/doris-meta - - ./conf/be.conf:/opt/be/conf/be.conf + #- ./conf/be.conf:/opt/be/conf/be.conf - ./data/be/storage:/opt/be/storage ports: - - 9092:9092 + - 8081:8081 + - 8030:8030 + - 9030:9030 + - 9050:9050 command: bash -c "/opt/fe/bin/start_fe.sh & /opt/be/bin/start_be.sh" + #mysql -h 127.0.0.1 -P9030 -u root -e 'ALTER SYSTEM ADD BACKEND "172.172.0.30:9050"' networks: default: - ipv4_address: 172.172.0.170 + ipv4_address: 172.172.0.30 diff --git a/labs/kafka/start.cmd b/labs/kafka/start.cmd new file mode 100644 index 00000000..9534090b --- /dev/null +++ b/labs/kafka/start.cmd @@ -0,0 +1 @@ +docker-compose -f docker-compose.yml up --remove-orphans --force-recreate -d \ No newline at end of file diff --git a/labs/kafka/stop.cmd b/labs/kafka/stop.cmd new file mode 100644 index 00000000..356959e4 --- /dev/null +++ b/labs/kafka/stop.cmd @@ -0,0 +1 @@ +docker-compose down --remove-orphans \ No newline at end of file diff --git a/publish/dslx/linux-x64/publish/docker/conf/website/nginx.conf b/publish/dslx/linux-x64/publish/docker/conf/website/nginx.conf index e371f872..06c184ae 100644 --- a/publish/dslx/linux-x64/publish/docker/conf/website/nginx.conf +++ b/publish/dslx/linux-x64/publish/docker/conf/website/nginx.conf @@ -1,4 +1,4 @@ -user root; +user root; worker_processes 4; #error_log logs/error.log; #error_log logs/error.log notice; @@ -16,26 +16,32 @@ http { keepalive_timeout 65; types { application/vnd.android.package-archive apk; - application/iphone pxl ipa; - text/plain plist; + application/iphone pxl ipa; + text/plain plist; } upstream gateway { server 172.172.0.12; } + server { + listen 80; + server_name iot.edusoa.com; + return 301 https://$host$request_uri; + } + server { #listen 80; listen 443; server_name iot.edusoa.com; - ssl on; - ssl_certificate edusoa.pem; - ssl_certificate_key edusoa.key; - ssl_session_timeout 5m; - ssl_protocols TLSv1 TLSv1.1 TLSv1.2; - ssl_ciphers HIGH:!RC4:!MD5:!aNULL:!eNULL:!NULL:!DH:!EDH:!EXP:+MEDIUM; - ssl_prefer_server_ciphers on; + ssl on; + ssl_certificate edusoa.pem; + ssl_certificate_key edusoa.key; + ssl_session_timeout 5m; + ssl_protocols TLSv1 TLSv1.1 TLSv1.2; + ssl_ciphers HIGH:!RC4:!MD5:!aNULL:!eNULL:!NULL:!DH:!EDH:!EXP:+MEDIUM; + ssl_prefer_server_ciphers on; location / { root /root/nginx/html/desktop;