diff --git a/src/main/java/com/darcytech/debezium/converter/MySqlDateTimeConverter.java b/src/main/java/com/darcytech/debezium/converter/MySqlDateTimeConverter.java
deleted file mode 100644
index aec4b779..00000000
--- a/src/main/java/com/darcytech/debezium/converter/MySqlDateTimeConverter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package com.darcytech.debezium.converter;
-
-import io.debezium.spi.converter.CustomConverter;
-import io.debezium.spi.converter.RelationalColumn;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.connect.data.SchemaBuilder;
-
-import java.time.*;
-import java.time.format.DateTimeFormatter;
-import java.util.Properties;
-import java.util.function.Consumer;
-
-/**
- * 处理Debezium时间转换的问题
- * Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的没法儿改,
- * 导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳
- * Debezium默认将MySQL中的timestamp类型转成UTC的字符串。
- * | mysql | mysql-binlog-connector | debezium |
- * | ----------------------------------- | ---------------------------------------- | --------------------------------- |
- * | date
(2021-01-28) | LocalDate
(2021-01-28) | Integer
(18655) |
- * | time
(17:29:04) | Duration
(PT17H29M4S) | Long
(62944000000) |
- * | timestamp
(2021-01-28 17:29:04) | ZonedDateTime
(2021-01-28T09:29:04Z) | String
(2021-01-28T09:29:04Z) |
- * | Datetime
(2021-01-28 17:29:04) | LocalDateTime
(2021-01-28T17:29:04) | Long
(1611854944000) |
- *
- * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
- */
-@Slf4j
-public class MySqlDateTimeConverter implements CustomConverter {
-
- private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
- private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
- private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
- private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
-
- private ZoneId timestampZoneId = ZoneId.systemDefault();
-
- @Override
- public void configure(Properties props) {
- readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));
- readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p));
- readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
- readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
- readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z));
- }
-
- private void readProps(Properties properties, String settingKey, Consumer callback) {
- String settingValue = (String) properties.get(settingKey);
- if (settingValue == null || settingValue.length() == 0) {
- return;
- }
- try {
- callback.accept(settingValue.trim());
- } catch (IllegalArgumentException | DateTimeException e) {
- log.error("The \"{}\" setting is illegal:{}", settingKey, settingValue);
- throw e;
- }
- }
-
- @Override
- public void converterFor(RelationalColumn column, ConverterRegistration registration) {
- String sqlType = column.typeName().toUpperCase();
- SchemaBuilder schemaBuilder = null;
- Converter converter = null;
- if ("DATE".equals(sqlType)) {
- schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
- converter = this::convertDate;
- }
- if ("TIME".equals(sqlType)) {
- schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
- converter = this::convertTime;
- }
- if ("DATETIME".equals(sqlType)) {
- schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
- converter = this::convertDateTime;
- }
- if ("TIMESTAMP".equals(sqlType)) {
- schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
- converter = this::convertTimestamp;
- }
- if (schemaBuilder != null) {
- registration.register(schemaBuilder, converter);
- log.info("register converter for sqlType {} to schema {}", sqlType, schemaBuilder.name());
- }
- }
-
- private String convertDate(Object input) {
- if (input instanceof LocalDate) {
- return dateFormatter.format((LocalDate) input);
- }
- if (input instanceof Integer) {
- LocalDate date = LocalDate.ofEpochDay((Integer) input);
- return dateFormatter.format(date);
- }
- return null;
- }
-
- private String convertTime(Object input) {
- if (input instanceof Duration) {
- Duration duration = (Duration) input;
- long seconds = duration.getSeconds();
- int nano = duration.getNano();
- LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
- return timeFormatter.format(time);
- }
- return null;
- }
-
- private String convertDateTime(Object input) {
- if (input instanceof LocalDateTime) {
- return datetimeFormatter.format((LocalDateTime) input);
- }
- return null;
- }
-
- private String convertTimestamp(Object input) {
- if (input instanceof ZonedDateTime) {
- // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
- ZonedDateTime zonedDateTime = (ZonedDateTime) input;
- LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
- return timestampFormatter.format(localDateTime);
- }
- return null;
- }
-
-}