Рубрики
Без рубрики

Пользовательские Конвертеры Debezium – Конвертер временных меток

Пользовательские конвертеры Debezium Создание пользовательских конвертеров с использованием нового SPI Debezium для переопределения… С тегами debezium, cdc, java, учебник.

Создание пользовательских конвертеров с использованием нового SPI Debezium для переопределения преобразований значений

Вступление

Справочная информация о преобразователе временных меток

Привет, меня зовут Райан Мошер, и я основал собственное сообщество для старших разработчиков в Израиле под названием in.dev . Я также являюсь ведущим архитектором в Речной , Мы являемся полностью управляемой платформой интеграции данных, и частью моей работы здесь является разработка возможности потоковой передачи изменений прямо из баз данных клиентов на нашу платформу с использованием системы сбора данных об изменениях (CDC).

Последний раз я кодировал на Java 7 лет назад, поэтому, если у вас есть какие-либо предложения по улучшению кода, показанного здесь, пожалуйста, не стесняйтесь комментировать ниже!

Вы можете найти конвертер прямо здесь: https://github.com/oryanmoshe/debezium-timestamp-converter/

CDC — Сбор данных Об изменениях

Прежде чем мы поговорим о Дебезии, мы должны поговорить о CDC.

CDC – это способ для нас получить изменения , происходящие в базе данных (в отличие от фактических данных ) Это означает, что мы действительно можем получить каждое состояние что каждая запись была просмотрена в базе данных.

CDC полезен в ряде случаев:

  • Составление журнала изменений записей
  • Отмена (или возврат) изменения
  • Удаление записей отслеживания (что не является простым вопросом использования SELECT )

В любом случае, что такое Дебезиум?

Debezium – это платформа с открытым исходным кодом, поддерживаемая Red Hat, которая позволяет разработчикам внедрять CDC в инфраструктуру Kafka. Debezium активирует CDC, настраивая соединения с помощью предоставленных соединителей данных Kafka Connect. В настоящее время существует поддержка MySQL, PostgreSQL, Microsoft SQL Server, MongoDB и даже некоторая ограниченная поддержка Oracle.

Что такое конвертеры и зачем нам нужен пользовательский?

Все сообщения, созданные Debezium, обрабатываются перед вводом в указанную тему. Это гарантирует, что все поля данного типа (определенного схемой) ведут себя одинаково. Другими словами, все |/ДАТА поля вкл. все базы данных будут преобразованы в один и тот же формат. По умолчанию это “Дни с момента эпохи”. Но такое поведение не всегда желательно, особенно в этом временном примере.

Для нашего конкретного случая использования нам нужно, чтобы все временные поля были в одном формате, независимо от типа ДАТА , ДАТА-ВРЕМЯ , ВРЕМЯ ДАТЫ2 , время или МЕТКА ВРЕМЕНИ . Формат, который мы выбрали, был ГГГГ-ММ-дд'Т'Ч:мм:ss.SSS'Z' .

Создание пользовательского конвертера

Вот объяснение каждого шага, необходимого для создания нашего Преобразователя временных меток .

Основы пользовательских конвертеров

Чтобы разрешить такое поведение, в Debezium версии 1.1 был добавлен SPI Debezium (Интерфейс поставщика услуг). Это позволяет разработчикам создавать свои собственные конвертеры с Java, создавая класс, реализующий io.debezium.spi.converter. Пользовательский конвертер интерфейс.

Первый Попался

Чего мы не знали, когда начинали разрабатывать этот конвертер, так это того, что как только мы зарегистрировали пользовательский конвертер во временную колонку, поведение Debezium стало спорадическим. Иногда он будет передавать столбец ДАТА как “Дни с момента эпохи”, как и ожидалось, но иногда он будет передавать его в виде строки, соответствующей формату даты базы данных, из которой он был получен.

Это означало, что мы должны были охватить все наши базы данных, как для числовых значений (скажем, “Дней с эпохи”), так и для всех баз данных формата даты, которые могут создавать ( ГГГГ-ММ-дд , дд/ММ/ГГГГ , ГГГГ-ММ-дд и т.д.)

Все стало немного сложнее с точки зрения логики, но давайте не будем вдаваться в это прямо сейчас.

Что нужно для работы нашего пользовательского конвертера

Каждый преобразователь должен реализовать по крайней мере два метода, которые будут использоваться Debezium:

конфигурировать

Этот метод запускается при инициализации соединителя. Он принимает один аргумент:

реквизит |/Объект типа java.util. Свойства , содержащие все свойства, которые мы передали нашему экземпляру конвертера.

Преобразователь для

Этот метод выполняется один раз для каждого столбца, определенного в нашей схеме, и его задача состоит в том, чтобы определить (он же “регистр”) преобразователь для каждого. Он принимает два аргумента:

колонка Объект типа io.debezium.spi.конвертер. Реляционный столбец , содержащий определение столбца, который мы в настоящее время обрабатываем, включая его имя, тип, размер, таблицу и т.д.

регистрация Объект типа io.debezium.spi.конвертер. Пользовательский Конвертер. Регистрация конвертера , внутреннее определение, имеющее один метод: регистрация .

Использование метода настройки

Как указано выше, мы используем метод configure для передачи свойств в наш конвертер. Это важно, потому что мы можем использовать один и тот же преобразователь для нескольких разъемов и изменять его поведение в соответствии с этими свойствами.

Для нашего Преобразователя временных меток мы хотели передать четыре свойства:

  • отлаживать – Указывает, следует ли печатать отладочные сообщения. По умолчанию false .
  • формат.дата – Формат для преобразования всех столбцов типа ДАТА . По умолчанию ГГГГ-ММ-дд .
  • формат.время – Формат для преобразования всех столбцов типа ВРЕМЯ . По умолчанию ЧЧ:мм:сс .
  • формат.дата-время – Формат для преобразования всех остальных временных столбцов . По умолчанию используется ГГГГ-ММ-дд'Т'Ч:мм:сс.ССС 'З' .

Все эти свойства являются необязательными и имеют связанные с ними значения по умолчанию. Для их поддержки мы определили каждый из них как свойство класса со значением по умолчанию. Внутри метода configure мы присвоили им переданное значение:

public class TimestampConverter implements CustomConverter {

    public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    public static final String DEFAULT_DATE_FORMAT = "YYYY-MM-dd";
    public static final String DEFAULT_TIME_FORMAT = "HH:mm:ss.SSS";

    public String strDatetimeFormat, strDateFormat, strTimeFormat;
    public Boolean debug;

    private SimpleDateFormat simpleDatetimeFormatter, simpleDateFormatter, simpleTimeFormatter;

    @Override
    public void configure(Properties props) {
        this.strDatetimeFormat = props.getProperty("format.datetime", DEFAULT_DATETIME_FORMAT);
        this.simpleDatetimeFormatter = new SimpleDateFormat(this.strDatetimeFormat);

        this.strDateFormat = props.getProperty("format.date", DEFAULT_DATE_FORMAT);
        this.simpleDateFormatter = new SimpleDateFormat(this.strDateFormat);

        this.strTimeFormat = props.getProperty("format.time", DEFAULT_TIME_FORMAT);
        this.simpleTimeFormatter = new SimpleDateFormat(this.strTimeFormat);

        this.debug = props.getProperty("debug", "false").equals("true");

        this.simpleDatetimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
        this.simpleTimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
    }
}

Использование преобразователя Для метода

Теперь пришло время для важного момента. Каждый столбец должен быть преобразован в соответствующий формат.

Прежде всего, мы должны понять тип столбца, с которым мы в настоящее время работаем. Это определяется с помощью column.typename . Если тип является любым из временных типов (определяется как константа класса), мы обрабатываем его соответствующим образом. Если это не так, мы ничего не предпримем, и Дебезиум возьмет управление на себя.

Чтобы сказать Debezium преобразовать определенный столбец во что-то другое, нам нужно использовать регистрацию , переданную нам. Затем зарегистрируйтесь это, предоставляя схему (создайте один из типов строка и сделайте его необязательным ) и преобразователем.

Преобразователь – это просто функция, или в нашем случае лямбда, которая получает Объект . Это исходное значение и возвращает значение, соответствующее предоставленной нами схеме. В нашем случае нам нужно было вернуть Строка (или null , потому что мы сделали это необязательным ).

@Override
public void converterFor(RelationalColumn column, ConverterRegistration registration) {
    if (SUPPORTED_DATA_TYPES.stream().anyMatch(s -> s.toLowerCase().equals(column.typeName().toLowerCase()))) {
        boolean isTime = "time".equals(column.typeName().toLowerCase());
        registration.register(datetimeSchema, rawValue -> {
            if (rawValue == null)
                return rawValue;

            Long millis = getMillis(rawValue.toString(), isTime);

            Instant instant = Instant.ofEpochMilli(millis);
            Date dateObject = Date.from(instant);

            switch (column.typeName().toLowerCase()) {
                case "time":
                    return simpleTimeFormatter.format(dateObject);
                case "date":
                    return simpleDateFormatter.format(dateObject);
                default:
                    return simpleDatetimeFormatter.format(dateObject);
            }
        });
    }
}

В этом фрагменте кода рассмотрим две важные части, о которых мы упоминали ранее. Это вызов регистрация.регистрация и возврат заявления.

Использование пользовательского конвертера с Debezium

Установка

Установка в нашем кластере Debezium осуществляется прямолинейно. Нам просто нужно добавить файл .jar конвертера в разъем, в котором мы хотим его использовать.

Второй Попался

Заметьте, я сказал”… к разъему, который мы хотим…”, это то, что Дебезиум не разъяснил в документации. Нам нужно добавить этот конвертер в каждый разъем , если мы хотим его использовать. Допустим, базовая папка для соединителей – /кафка/подключить . Затем внутри мы найдем папки, такие как debezium-connector-mysql или debezium-connector-postgres .

Нам нужно добавить конвертер .jar файл в каждую из этих папок, если мы намерены его использовать.

Конфигурация

После добавления .jar

Для этого все, что нам нужно сделать, это добавить следующие ключи в нашу существующую конфигурацию:

"converters": "timestampConverter",
"timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter"

Если нам нужно настроить форматы определенных типов данных, мы можем использовать эти дополнительные ключи конфигурации:

"timestampConverter.format.time": "HH:mm:ss.SSS",
"timestampConverter.format.date": "YYYY-MM-dd",
"timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"timestampConverter.debug": "false"

Выводы

Добавление SPI в Debezium многое изменило в плане пользовательских конвертеров. Это позволяет нам получить специально подходящий разъем CDC, с потоковой передачей данных в наш кластер Kafka именно в том формате, который мы хотим.

Я не включил фактическую логику, преобразующую значения из их необработанного формата во время эпохи (эта часть содержится в методе getMillis ) Но Я опубликовал Конвертер временных меток с открытым исходным кодом, поэтому любой может прочитать код там, использовать конвертер в приложении (будь то .jar файл, найденный в разделе релизы, или как зависимость, найденная в разделе пакеты), или внести свой вклад в его разработку!

Не стесняйтесь предлагать вклад в этот конвертер и делиться со мной, какие конвертеры вы созданы с использованием нового SPI Debezium, и какие из них вы хотели бы сделать!

Связи

Чтобы узнать больше о пользовательском конвертере Debezium, посетите их официальную документацию: https://debezium.io/documentation/reference/1.1/development/converters.html

Ссылка на репозиторий моего Конвертера временных меток : Ссылка на репозиторий моего

Оригинал: “https://dev.to/oryanmoshe/debezium-custom-converters-timestampconverter-26hh”