Рубрики
Без рубрики

Как извлечь события изменения данных из MySQL в Kafka с помощью Debezium

Узнайте, как извлекать события CDC (Сбор данных об изменении) из двоичного журнала MySQL с помощью Debezium и отправлять их в Apache Kafka.

Автор оригинала: Vlad Mihalcea.

Вступление

Как объяснялось ранее, CDC (Сбор данных об изменениях) является одним из лучших способов подключения системы баз данных OLTP к другим системам, таким как хранилище данных, Кэш, Spark или Hadoop.

Debezium -это проект с открытым исходным кодом, разработанный Red Hat, который направлен на упрощение этого процесса, позволяя извлекать изменения из различных систем баз данных (например, MySQL, PostgreSQL, MongoDB) и отправлять их в Apache Kafka .

В этой статье мы рассмотрим, как вы можете извлекать события из двоичных журналов MySQL с помощью Debezium.

Архитектура Debezium

Во-первых, вам нужен разъем Debezium для конкретной базы данных, чтобы иметь возможность извлекать журнал повтора (например, Oracle), Двоичный журнал (например, MySQL) или журналы с опережающей записью (например, PostgreSQL).

Вам также необходимо запустить Kafka, чтобы вы могли передавать извлеченные события журнала и предоставлять их другим службам в вашей корпоративной системе. Apache ZooKeeper нужен не Debezium, а Кафке, поскольку он полагается на него для достижения консенсуса, а также для гарантий линеаризуемости.

Установка Debian

Если вы хотите попробовать Debezium, вы можете следовать этому очень обширному учебному пособию , предлагаемому в разделе документации Debezium.

В принципе, вам нужно запустить следующие контейнеры Docker:

> docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.5

> docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.5

> docker run -it --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5

> docker run -it --name kafka-connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.5

> docker run -it --name kafka-watcher --link zookeeper:zookeeper debezium/kafka:0.5 watch-topic -a -k dbserver1.inventory.customers

После этого у вас должны быть следующие контейнеры, перечисленные Docker:

> docker ps -a

CONTAINER ID        IMAGE                          NAMES
bbfeafd9125c        debezium/kafka:0.5             kafka-watcher
4cfffedae69c        debezium/connect:0.5           kafka-connect
36734bc82864        debezium/example-mysql:0.5     mysql
daaaab6f3206        debezium/kafka:0.5             kafka
8a7affd3e2a4        debezium/zookeeper:0.5         zookeeper

Используя bash, вам необходимо создать новый соединитель:

> curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

Обратите внимание, что kafka-watcher был запущен в интерактивном режиме, чтобы мы могли видеть в консоли события журнала CDC, записанные Debezium.

Время тестирования

Теперь, если мы подключимся к контейнеру MySQL Docker с помощью root пользователя и debezium пароля, мы сможем выдавать различные инструкции SQL и проверять вывод консоли kafka-watcher container.

ВСТАВЛЯТЬ

При вставке новой строки клиент :

INSERT INTO `inventory`.`customers`
(
    `first_name`,
    `last_name`,
    `email`)
VALUES
(
    'Vlad',
    'Mihalcea',
    'vlad@acme.org'
)

В наблюдатель кафки теперь мы можем найти следующую запись JSON:

{  
   "payload":{  
      "before":null,
      "after":{  
         "id":1005,
         "first_name":"Vlad",
         "last_name":"Mihalcea",
         "email":"vlad@acme.org"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369632,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":364,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"c",
      "ts_ms":1500369632095
   }
}

Объект до имеет значение null , в то время как объект после показывает только что вставленное значение. Обратите внимание , что значение атрибута op равно c , что означает, что это событие СОЗДАНИЯ.

ОБНОВЛЕНИЕ

При обновлении строки клиент :

UPDATE `inventory`.`customers`
SET
    `email` = 'vlad.mihalcea@acme.org'
WHERE 
    `id` = 1005

Теперь мы можем найти следующее событие журнала:

{
    "payload":{ 
      "before":{ 
         "id":1005,
         "first_name":"Vlad",
         "last_name":"Mihalcea",
         "email":"vlad@acme.org"
      },
      "after":{ 
         "id":1005,
         "first_name":"Vlad",
         "last_name":"Mihalcea",
         "email":"vlad.mihalcea@acme.org"
      },
      "source":{ 
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369929,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":673,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"u",
      "ts_ms":1500369929464
   }
}

Значение атрибута op равно u , что означает, что у нас есть событие журнала ОБНОВЛЕНИЙ. Объект до показывает состояние строки до обновления, в то время как объект после фиксирует текущее состояние обновленной строки клиент базы данных.

УДАЛИТЬ

При выдаче инструкции по удалению:

DELETE FROM `inventory`.`customers`
WHERE id = 1005;

Следующее событие записывается контейнером kafka-connect Docker:

{ 
    "payload":{  
      "before":{  
         "id":1005,
         "first_name":"Vlad",
         "last_name":"Mihalcea",
         "email":"vlad.mihalcea@acme.org"
      },
      "after":null,
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500370394,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":1025,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"d",
      "ts_ms":1500370394589
   }
}

Значение атрибута op равно d , что означает, что у нас есть событие УДАЛЕНИЯ журнала, и после объект теперь null . Объект before фиксирует состояние строки базы данных до ее удаления.

Блестяще, правда?

Вывод

Debezium-это потрясающий инструмент, который вы можете использовать для соединения ваших интерфейсных систем OLTP с Apache Kafka, который служит основой вашей корпоративной системы.

Планируется разработать еще много соединителей, так что следите за обновлениями и тоже принимайте участие.