Автор оригинала: Vlad Mihalcea.
Вступление
Как объяснялось ранее, CDC (Сбор данных об изменениях) является одним из лучших способов подключения системы баз данных OLTP к другим системам, таким как хранилище данных, Кэш, Spark или Hadoop.
Debezium -это проект с открытым исходным кодом, разработанный Red Hat, который направлен на упрощение этого процесса, позволяя извлекать изменения из различных систем баз данных (например, MySQL, PostgreSQL, MongoDB) и отправлять их в Apache Kafka .
В этой статье мы рассмотрим, как вы можете извлекать события из двоичных журналов MySQL с помощью Debezium.
Архитектура Debezium
Во-первых, вам нужен разъем Debezium для конкретной базы данных, чтобы иметь возможность извлекать журнал повтора (например, Oracle), Двоичный журнал (например, MySQL) или журналы с опережающей записью (например, PostgreSQL).
Вам также необходимо запустить Kafka, чтобы вы могли передавать извлеченные события журнала и предоставлять их другим службам в вашей корпоративной системе. Apache ZooKeeper нужен не Debezium, а Кафке, поскольку он полагается на него для достижения консенсуса, а также для гарантий линеаризуемости.
Установка Debian
Если вы хотите попробовать Debezium, вы можете следовать этому очень обширному учебному пособию , предлагаемому в разделе документации Debezium.
В принципе, вам нужно запустить следующие контейнеры Docker:
> docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.5 > docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.5 > docker run -it --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 > docker run -it --name kafka-connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.5 > docker run -it --name kafka-watcher --link zookeeper:zookeeper debezium/kafka:0.5 watch-topic -a -k dbserver1.inventory.customers
После этого у вас должны быть следующие контейнеры, перечисленные Docker:
> docker ps -a CONTAINER ID IMAGE NAMES bbfeafd9125c debezium/kafka:0.5 kafka-watcher 4cfffedae69c debezium/connect:0.5 kafka-connect 36734bc82864 debezium/example-mysql:0.5 mysql daaaab6f3206 debezium/kafka:0.5 kafka 8a7affd3e2a4 debezium/zookeeper:0.5 zookeeper
Используя bash, вам необходимо создать новый соединитель:
> curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
Обратите внимание, что kafka-watcher
был запущен в интерактивном режиме, чтобы мы могли видеть в консоли события журнала CDC, записанные Debezium.
Время тестирования
Теперь, если мы подключимся к контейнеру MySQL Docker с помощью root
пользователя и debezium
пароля, мы сможем выдавать различные инструкции SQL и проверять вывод консоли kafka-watcher
container.
ВСТАВЛЯТЬ
При вставке новой строки клиент
:
INSERT INTO `inventory`.`customers` ( `first_name`, `last_name`, `email`) VALUES ( 'Vlad', 'Mihalcea', 'vlad@acme.org' )
В наблюдатель кафки
теперь мы можем найти следующую запись JSON:
{ "payload":{ "before":null, "after":{ "id":1005, "first_name":"Vlad", "last_name":"Mihalcea", "email":"vlad@acme.org" }, "source":{ "name":"dbserver1", "server_id":223344, "ts_sec":1500369632, "gtid":null, "file":"mysql-bin.000003", "pos":364, "row":0, "snapshot":null, "thread":13, "db":"inventory", "table":"customers" }, "op":"c", "ts_ms":1500369632095 } }
Объект до
имеет значение null
, в то время как объект после
показывает только что вставленное значение. Обратите внимание , что значение атрибута op
равно c
, что означает, что это событие СОЗДАНИЯ.
ОБНОВЛЕНИЕ
При обновлении строки клиент
:
UPDATE `inventory`.`customers` SET `email` = 'vlad.mihalcea@acme.org' WHERE `id` = 1005
Теперь мы можем найти следующее событие журнала:
{ "payload":{ "before":{ "id":1005, "first_name":"Vlad", "last_name":"Mihalcea", "email":"vlad@acme.org" }, "after":{ "id":1005, "first_name":"Vlad", "last_name":"Mihalcea", "email":"vlad.mihalcea@acme.org" }, "source":{ "name":"dbserver1", "server_id":223344, "ts_sec":1500369929, "gtid":null, "file":"mysql-bin.000003", "pos":673, "row":0, "snapshot":null, "thread":13, "db":"inventory", "table":"customers" }, "op":"u", "ts_ms":1500369929464 } }
Значение атрибута op
равно u
, что означает, что у нас есть событие журнала ОБНОВЛЕНИЙ. Объект до
показывает состояние строки до обновления, в то время как объект после
фиксирует текущее состояние обновленной строки клиент
базы данных.
УДАЛИТЬ
При выдаче инструкции по удалению:
DELETE FROM `inventory`.`customers` WHERE id = 1005;
Следующее событие записывается контейнером kafka-connect
Docker:
{ "payload":{ "before":{ "id":1005, "first_name":"Vlad", "last_name":"Mihalcea", "email":"vlad.mihalcea@acme.org" }, "after":null, "source":{ "name":"dbserver1", "server_id":223344, "ts_sec":1500370394, "gtid":null, "file":"mysql-bin.000003", "pos":1025, "row":0, "snapshot":null, "thread":13, "db":"inventory", "table":"customers" }, "op":"d", "ts_ms":1500370394589 } }
Значение атрибута op
равно d
, что означает, что у нас есть событие УДАЛЕНИЯ журнала, и после
объект теперь null
. Объект before
фиксирует состояние строки базы данных до ее удаления.
Блестяще, правда?
Вывод
Debezium-это потрясающий инструмент, который вы можете использовать для соединения ваших интерфейсных систем OLTP с Apache Kafka, который служит основой вашей корпоративной системы.
Планируется разработать еще много соединителей, так что следите за обновлениями и тоже принимайте участие.