From 4fc6796af2d18a558e24efadd709a4e03090ceb9 Mon Sep 17 00:00:00 2001 From: wanggang <76527413@qq.com> Date: Fri, 30 Apr 2021 16:59:49 +0800 Subject: [PATCH] update Former-commit-id: 98a5b85745805470e3b25b3a395ee10c6d27c010 Former-commit-id: 0df7418d08c46f8e78d520cd79c89e56b86b17d6 --- .../kafka-connect/kafka2elasticsearch.json | 13 +++++++-- .../doris/conf/kafka-connect/mysql2kafka.json | 2 +- labs/doris/conf/kafka-connect/start.sh | 6 +++- labs/doris/readme.md | 28 ++++++++++++++++++- 4 files changed, 44 insertions(+), 5 deletions(-) diff --git a/labs/doris/conf/kafka-connect/kafka2elasticsearch.json b/labs/doris/conf/kafka-connect/kafka2elasticsearch.json index 045e6190..cf4fd686 100644 --- a/labs/doris/conf/kafka-connect/kafka2elasticsearch.json +++ b/labs/doris/conf/kafka-connect/kafka2elasticsearch.json @@ -4,8 +4,17 @@ "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": 1, "topics": "mysql.example.User", - "key.ignore": true, + "key.ignore": false, + "key.converter.schemas.enable": false, + "key.converter": "org.apache.kafka.connect.storage.StringConverter", "connection.url": "http://elasticsearch:9200", - "type.name": "kafka-connect" + "type.name": "kafka-connect", + "write.method": "upsert", + "behavior.on.null.values": "delete", + "transforms": "key,ExtractFieldObject", + "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key", + "transforms.key.field": "Id", + "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value", + "transforms.ExtractFieldObject.field": "after" } } \ No newline at end of file diff --git a/labs/doris/conf/kafka-connect/mysql2kafka.json b/labs/doris/conf/kafka-connect/mysql2kafka.json index b1da9780..b368d326 100644 --- a/labs/doris/conf/kafka-connect/mysql2kafka.json +++ b/labs/doris/conf/kafka-connect/mysql2kafka.json @@ -1,5 +1,5 @@ { - "name": "msyql-source", + "name": "mysql-source", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", diff --git a/labs/doris/conf/kafka-connect/start.sh b/labs/doris/conf/kafka-connect/start.sh index 4856ed25..aecaf74b 100644 --- a/labs/doris/conf/kafka-connect/start.sh +++ b/labs/doris/conf/kafka-connect/start.sh @@ -1,3 +1,7 @@ #!/bin/bash +echo 'init connectors start:' +echo 'mysql2kafka' curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @mysql2kafka.json -curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @kafka2elasticsearch.json \ No newline at end of file +echo 'kafka2elasticsearch' +curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @kafka2elasticsearch.json +echo 'init connectors end!' \ No newline at end of file diff --git a/labs/doris/readme.md b/labs/doris/readme.md index 42ae1c06..82d1181c 100644 --- a/labs/doris/readme.md +++ b/labs/doris/readme.md @@ -108,7 +108,33 @@ SHOW ALL ROUTINE LOAD; 创建导入任务: -CREATE ROUTINE LOAD example.job1 on User +CREATE ROUTINE LOAD example.job2 on User +--ALTER ROUTINE LOAD FOR example.job1 +columns ( + `Id`, + `UserName`, + `SecurityStamp`, + `PasswordHash`, + `PasswordConfirmed`, + `Email`, + `EmailConfirmed`, + `PhoneNumber`, + `PhoneNumberConfirmed`, + `RealName`, + `IdentityNumber`, + `IdentityConfirmed`, + `NickName`, + `Avatar`, + `Sex`, + `Birthday`, + `LockoutEnabled`, + `AccessFailedCount`, + `LockoutEnd`, + `RowVersion`, + `Created`, + `Modified`, + `Deleted` + ) PROPERTIES ( "format"="json",