From 5e1df6c47b13563bd259c52682049f98f891e1ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=B5=B7?= <10402852@qq.com> Date: Mon, 18 Nov 2024 10:57:43 +0800 Subject: [PATCH] 'commit' --- .../converter/MySqlDateTimeConverter.java | 125 ------------------ 1 file changed, 125 deletions(-) delete mode 100644 src/main/java/com/darcytech/debezium/converter/MySqlDateTimeConverter.java diff --git a/src/main/java/com/darcytech/debezium/converter/MySqlDateTimeConverter.java b/src/main/java/com/darcytech/debezium/converter/MySqlDateTimeConverter.java deleted file mode 100644 index aec4b779..00000000 --- a/src/main/java/com/darcytech/debezium/converter/MySqlDateTimeConverter.java +++ /dev/null @@ -1,125 +0,0 @@ -package com.darcytech.debezium.converter; - -import io.debezium.spi.converter.CustomConverter; -import io.debezium.spi.converter.RelationalColumn; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.connect.data.SchemaBuilder; - -import java.time.*; -import java.time.format.DateTimeFormatter; -import java.util.Properties; -import java.util.function.Consumer; - -/** - * 处理Debezium时间转换的问题 - * Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的没法儿改, - * 导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳 - * Debezium默认将MySQL中的timestamp类型转成UTC的字符串。 - * | mysql | mysql-binlog-connector | debezium | - * | ----------------------------------- | ---------------------------------------- | --------------------------------- | - * | date
(2021-01-28) | LocalDate
(2021-01-28) | Integer
(18655) | - * | time
(17:29:04) | Duration
(PT17H29M4S) | Long
(62944000000) | - * | timestamp
(2021-01-28 17:29:04) | ZonedDateTime
(2021-01-28T09:29:04Z) | String
(2021-01-28T09:29:04Z) | - * | Datetime
(2021-01-28 17:29:04) | LocalDateTime
(2021-01-28T17:29:04) | Long
(1611854944000) | - * - * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter - */ -@Slf4j -public class MySqlDateTimeConverter implements CustomConverter { - - private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; - private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; - private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; - private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; - - private ZoneId timestampZoneId = ZoneId.systemDefault(); - - @Override - public void configure(Properties props) { - readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p)); - readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p)); - readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p)); - readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); - readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z)); - } - - private void readProps(Properties properties, String settingKey, Consumer callback) { - String settingValue = (String) properties.get(settingKey); - if (settingValue == null || settingValue.length() == 0) { - return; - } - try { - callback.accept(settingValue.trim()); - } catch (IllegalArgumentException | DateTimeException e) { - log.error("The \"{}\" setting is illegal:{}", settingKey, settingValue); - throw e; - } - } - - @Override - public void converterFor(RelationalColumn column, ConverterRegistration registration) { - String sqlType = column.typeName().toUpperCase(); - SchemaBuilder schemaBuilder = null; - Converter converter = null; - if ("DATE".equals(sqlType)) { - schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string"); - converter = this::convertDate; - } - if ("TIME".equals(sqlType)) { - schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string"); - converter = this::convertTime; - } - if ("DATETIME".equals(sqlType)) { - schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string"); - converter = this::convertDateTime; - } - if ("TIMESTAMP".equals(sqlType)) { - schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string"); - converter = this::convertTimestamp; - } - if (schemaBuilder != null) { - registration.register(schemaBuilder, converter); - log.info("register converter for sqlType {} to schema {}", sqlType, schemaBuilder.name()); - } - } - - private String convertDate(Object input) { - if (input instanceof LocalDate) { - return dateFormatter.format((LocalDate) input); - } - if (input instanceof Integer) { - LocalDate date = LocalDate.ofEpochDay((Integer) input); - return dateFormatter.format(date); - } - return null; - } - - private String convertTime(Object input) { - if (input instanceof Duration) { - Duration duration = (Duration) input; - long seconds = duration.getSeconds(); - int nano = duration.getNano(); - LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano); - return timeFormatter.format(time); - } - return null; - } - - private String convertDateTime(Object input) { - if (input instanceof LocalDateTime) { - return datetimeFormatter.format((LocalDateTime) input); - } - return null; - } - - private String convertTimestamp(Object input) { - if (input instanceof ZonedDateTime) { - // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间 - ZonedDateTime zonedDateTime = (ZonedDateTime) input; - LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime(); - return timestampFormatter.format(localDateTime); - } - return null; - } - -}