Change Data Capture — Convert your database into a stream with Debezium
Have you ever thought about creating a stream from database operations? In this story, you will learn what Change Data Capture is and how to use it while planning your system architecture. In the practical part, we will see the Debezium in action.
What is Change Data Capture?
Change Data Capture is a process of detecting changes made to the database. The changes can then be streamed and integrated with other databases and systems. In other words: we receive a stream of events from our database.
This allows us to make faster and more accurate decisions based on data (Stream Processing and Streaming ETL). It does not overload systems and networks as classical solutions do. Often it is the only reasonable solution to upgrade legacy systems.
Types of Change Data Capture
What if I query the database every 5 seconds. Is this Change Data Capture? In a way, yes, but this method has its drawbacks:
- This method requires proper preparation of the table schema: a column with the time of record modification and the mechanism of its update;
- We do not detect all operations. In the case of record deletion, we must apply the soft delete trick, that is, mark the record with a delete flag, and physically delete it later using another mechanism;
- We do not detect changes in the table structure;
- We can overload the database or network with queries if there is a lot of data;
- We do not detect the previous state in which the record was stored.
Are there so many disadvantages to using this approach? It all depends on the context. Logstash can be used to synchronize Elasticsearch with a database. This is a simple solution and if it meets our requirements, there is no point in maintaining another technology.
Generalizing, databases save all transactions and modifications to the database in the transaction log. This is a key component. In case of failure, it allows restoring a consistent state.
By reading such a log, we can detect all write operations, changes, and deletions of records, as well as changes to the table schema. Such data can be sent at once or in packets. The load on the database is negligible, certainly less than in the case of Query-Based CDC.
Each database approaches Change Data Capture in its way. You can find information on how SQL Server does it here. In the second part of this article, we will use Debezium, an open and distributed CDC platform, and MySQL database.
How can I use this?
Writing to multiple sources
We start with a simple application and a single database. With time, you add cache (Redis, Memcached), search engine (Elasticsearch, Solr), queues (Kafka, RabbitMQ), microservices, and multiple databases (Polyglot Persistence).
Ultimately, the data has multiple targets. Sending a record to all databases through a single service complicates the logic and makes it difficult to maintain consistency (Two Phase Commit also has its issues). It may be too late to implement the Event Sourcing architecture in the application.
Have you ever seen the “integration” of data between databases based on
SELECT * FROM table ? I have. I didn’t know whether to laugh or cry 😉 . Change Data Capture would work well in this case too. Certainly much less invasive.
ETL and Stream Processing
Creating a stream from a database allows us to apply stream processing technology to ETL processes. Architectures like lambda or kappa speed up decision-making processes. Additionally, not all operations can be parallelized. We know from Amdahl’s Law that even mostly parallelized code does not scale linearly. The streaming approach distributes processing over time which has a positive impact on resource utilization.
Debezium is an open and distributed CDC platform. In practice, it is a collection of connectors to Kafka Connect, and as such, the solution is based on Apache Kafka. We have connectors for MySQL, PostgreSQL, MongoDb, Cassandra, SQL Server, Oracle, Db2, and Vitesse.
The newer the technology, the more difficult is to run Hello World 😅. We will use Docker Compose with:
- Apache Kafka,
- AKHQ — Web GUI for Kafka,
- Kafka Connect with Debezium,
- MySQL version 8.
- name: "connect"
Configuring MySQL Debezium in Kafka Connect
I’ve dedicated a separate story to Kafka Connect. Now that we have AKHQ, let’s try to click something.
The configuration used is shown below. AKHQ prompts us for all possible options along with explanations.
tasks.max = 1
database.history.kafka.topic = schema-changes.school
database.hostname = mysql
database.user = user
database.password = password
database.server.name = school
database.include.list = school
database.history.kafka.bootstrap.servers = kafka:9092
If the configuration is correct, we should see green conenctor status and new topics in Kafka.
onnect_1 | 2021-01-10 17:42:06,243 INFO MySQL|school|task Creating thread debezium-mysqlconnector-school-binlog-client [io.debezium.util.Threads]
connect_1 | 2021-01-10 17:42:06,246 INFO MySQL|school|task Creating thread debezium-mysqlconnector-school-binlog-client [io.debezium.util.Threads]
connect_1 | Jan 10, 2021 5:42:06 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
connect_1 | INFO: Connected to mysql:3306 at binlog.000001/156 (sid:5793, cid:14)
connect_1 | 2021-01-10 17:42:06,262 INFO MySQL|school|binlog Connected to MySQL binlog at mysql:3306, starting at binlog file 'binlog.000001', pos=156, skipping 0 events plus 0 rows [io.debezium.connector.mysql.BinlogReader]
connect_1 | 2021-01-10 17:42:06,262 INFO MySQL|school|task Waiting for keepalive thread to start [io.debezium.connector.mysql.BinlogReader]
connect_1 | 2021-01-10 17:42:06,263 INFO MySQL|school|binlog Creating thread debezium-mysqlconnector-school-binlog-client [io.debezium.util.Threads]
connect_1 | 2021-01-10 17:42:06,363 INFO MySQL|school|task Keepalive thread is running [io.debezium.connector.mysql.BinlogReader]
Adding data to MySQL
You can access the MySQL console in Docker Compose with the following command.
sudo docker-compose exec mysql mysql -uroot -p
Making sense of the data in this case is irrelevant. I took pre-made SQL chunks from Logstash’s integration of the MySQL database with Elasticsearch from my Elastic Stack course.
CREATE TABLE IF NOT EXISTS groups(
group_id int(11) NOT NULL AUTO_INCREMENT,
group_number char(4) NOT NULL,
modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (group_id),
KEY group_number (group_number)
INSERT INTO groups
(group_number, description, something_important)
description = 'updated'
group_id % 2 = 0;
Debezium in action
As you just found out, CDC in Debezium is not that difficult and it opens up a lot of possibilities. If you’re interested in this topic, it’s worth listening to a podcast from Confluent featuring the co-author of Debezium.