Change Data Capture — Convert your database into a stream with Debezium

Have you ever thought about creating a stream from database operations? In this story, you will learn what Change Data Capture is and how to use it while planning your system architecture. In the practical part, we will see the Debezium in action.

What is Change Data Capture?

Change Data Capture is a process of detecting changes made to the database. The changes can then be streamed and integrated with other databases and systems. In other words: we receive a stream of events from our database.

This allows us to make faster and more accurate decisions based on data (Stream Processing and Streaming ETL). It does not overload systems and networks as classical solutions do. Often it is the only reasonable solution to upgrade legacy systems.

Types of Change Data Capture

QUERY-BASED

What if I query the database every 5 seconds. Is this Change Data Capture? In a way, yes, but this method has its drawbacks:

  1. This method requires proper preparation of the table schema: a column with the time of record modification and the mechanism of its update;
  2. We do not detect all operations. In the case of record deletion, we must apply the soft delete trick, that is, mark the record with a delete flag, and physically delete it later using another mechanism;
  3. We do not detect changes in the table structure;
  4. We can overload the database or network with queries if there is a lot of data;
  5. We do not detect the previous state in which the record was stored.

Are there so many disadvantages to using this approach? It all depends on the context. Logstash can be used to synchronize Elasticsearch with a database. This is a simple solution and if it meets our requirements, there is no point in maintaining another technology.

LOG-BASED

Generalizing, databases save all transactions and modifications to the database in the transaction log. This is a key component. In case of failure, it allows restoring a consistent state.

By reading such a log, we can detect all write operations, changes, and deletions of records, as well as changes to the table schema. Such data can be sent at once or in packets. The load on the database is negligible, certainly less than in the case of Query-Based CDC.

Each database approaches Change Data Capture in its way. You can find information on how SQL Server does it here. In the second part of this article, we will use Debezium, an open and distributed CDC platform, and MySQL database.

How can I use this?

Writing to multiple sources

We start with a simple application and a single database. With time, you add cache (Redis, Memcached), search engine (Elasticsearch, Solr), queues (Kafka, RabbitMQ), microservices, and multiple databases (Polyglot Persistence).

Ultimately, the data has multiple targets. Sending a record to all databases through a single service complicates the logic and makes it difficult to maintain consistency (Two Phase Commit also has its issues). It may be too late to implement the Event Sourcing architecture in the application.

Database integration

Have you ever seen the “integration” of data between databases based on SELECT * FROM table ? I have. I didn’t know whether to laugh or cry 😉 . Change Data Capture would work well in this case too. Certainly much less invasive.

ETL and Stream Processing

Creating a stream from a database allows us to apply stream processing technology to ETL processes. Architectures like lambda or kappa speed up decision-making processes. Additionally, not all operations can be parallelized. We know from Amdahl’s Law that even mostly parallelized code does not scale linearly. The streaming approach distributes processing over time which has a positive impact on resource utilization.

Debezium

Debezium is an open and distributed CDC platform. In practice, it is a collection of connectors to Kafka Connect, and as such, the solution is based on Apache Kafka. We have connectors for MySQL, PostgreSQL, MongoDb, Cassandra, SQL Server, Oracle, Db2, and Vitesse.

Environment

The newer the technology, the more difficult is to run Hello World 😅. We will use Docker Compose with:

  • Zookeeper,
  • Apache Kafka,
  • AKHQ — Web GUI for Kafka,
  • Kafka Connect with Debezium,
  • MySQL version 8.
version: '2'

services:
zookeeper:
image: 'docker.io/bitnami/zookeeper:3-debian-10'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'docker.io/bitnami/kafka:2-debian-10'
ports:
- '9092:9092'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
depends_on:
- zookeeper
connect:
image: debezium/connect:1.4
ports:
- 8083:8083
environment:
- STATUS_STORAGE_TOPIC=my_connect_statuses
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- CONFIG_STORAGE_TOPIC=my_connect_configs
- GROUP_ID=1
- BOOTSTRAP_SERVERS=kafka:9092
akhq:
image: tchiotludo/akhq
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:9092"
connect:
- name: "connect"
url: "http://connect:8083"
ports:
- 8080:8080
links:
- kafka
- zookeeper

mysql:
image: mysql:8
ports:
- 3306:3306
environment:
MYSQL_DATABASE: 'school'
MYSQL_USER: 'user'
MYSQL_PASSWORD: 'password'
MYSQL_ROOT_PASSWORD: 'password'
AKHQ makes it easy to manage an Apache Kafka cluster

Configuring MySQL Debezium in Kafka Connect

I’ve dedicated a separate story to Kafka Connect. Now that we have AKHQ, let’s try to click something.

Widać dostępne wszystkie connectory od Debezium

The configuration used is shown below. AKHQ prompts us for all possible options along with explanations.

tasks.max = 1
database.history.kafka.topic = schema-changes.school
database.hostname = mysql
database.user = user
database.password = password
database.server.name = school
database.include.list = school
database.history.kafka.bootstrap.servers = kafka:9092

If the configuration is correct, we should see green conenctor status and new topics in Kafka.

onnect_1    | 2021-01-10 17:42:06,243 INFO   MySQL|school|task  Creating thread debezium-mysqlconnector-school-binlog-client   [io.debezium.util.Threads]
connect_1 | 2021-01-10 17:42:06,246 INFO MySQL|school|task Creating thread debezium-mysqlconnector-school-binlog-client [io.debezium.util.Threads]
connect_1 | Jan 10, 2021 5:42:06 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
connect_1 | INFO: Connected to mysql:3306 at binlog.000001/156 (sid:5793, cid:14)
connect_1 | 2021-01-10 17:42:06,262 INFO MySQL|school|binlog Connected to MySQL binlog at mysql:3306, starting at binlog file 'binlog.000001', pos=156, skipping 0 events plus 0 rows [io.debezium.connector.mysql.BinlogReader]
connect_1 | 2021-01-10 17:42:06,262 INFO MySQL|school|task Waiting for keepalive thread to start [io.debezium.connector.mysql.BinlogReader]
connect_1 | 2021-01-10 17:42:06,263 INFO MySQL|school|binlog Creating thread debezium-mysqlconnector-school-binlog-client [io.debezium.util.Threads]
connect_1 | 2021-01-10 17:42:06,363 INFO MySQL|school|task Keepalive thread is running [io.debezium.connector.mysql.BinlogReader]

Adding data to MySQL

You can access the MySQL console in Docker Compose with the following command.

sudo docker-compose exec mysql mysql -uroot -p

Making sense of the data in this case is irrelevant. I took pre-made SQL chunks from Logstash’s integration of the MySQL database with Elasticsearch from my Elastic Stack course.

use school;

CREATE TABLE IF NOT EXISTS groups(
group_id int(11) NOT NULL AUTO_INCREMENT,
group_number char(4) NOT NULL,
description VARCHAR(100),
something_important int(11),
modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (group_id),
KEY group_number (group_number)
) ENGINE=InnoDB;

INSERT INTO groups
(group_number, description, something_important)
VALUES
('1A','some group',100),
('2A','some group',102),
('3A','some group',101),
('1B','some group',123),
('2B','some group',133),
('3B','some group',144);

UPDATE school.groups
SET
description = 'updated'
WHERE
group_id % 2 = 0;

Debezium in action

Summary

As you just found out, CDC in Debezium is not that difficult and it opens up a lot of possibilities. If you’re interested in this topic, it’s worth listening to a podcast from Confluent featuring the co-author of Debezium.

Software Developer, Big Data Engineer, Blogger (https://wiadrodanych.pl), Amateur Cyclists & Triathlete, @maciej_szymczyk

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store