Change data capture Pipeline using Debezium
Real-time data replication and indexing are critical for building responsive and scalable systems. Change Data Capture (CDC) is a technique for tracking changes in a database and propagating them to downstream systems in near-real time.
In this blog, we’ll walk through setting up a CDC pipeline using Docker Compose. Our pipeline will sync changes from a PostgreSQL database to Elasticsearch and visualize them in Kibana.
Architecture Overview
Debezium source connector
Debezium sink connector
We will follow the middle part of the architecture. i.e., create a pipeline that takes in changes from postgres and push them to elasticsearch and finally verify the data in Kibana
Setting up the environment
Standing up the components using docker compose
We will primary rely on docker compose to bring up the entire stack mentioned above
|
|
In the above docker-compose file, we need to specify few critical things
Lines 6-8
specify the user we would be using for debezium source connectorLine 56
specifies the path where debezium would look for plugins in order to connect to external systems. These external systems could be source systems (postgres, mariadb, etc.) or sink systems (snowflake, s3, elasticsearch, etc.)Line 60
is used to mount the plugins so that it can be used by debezium
We can download the plugins required from and store them after extracting under plugins directory Once we bring up the setup using compose, the below command should show that we have both - postgres and elasticsearch connectors available
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ curl -X GET http://localhost:8083/connector-plugins -s | python -m json.tool
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "14.1.2"
},
{
"class": "io.debezium.connector.postgresql.PostgresConnector",
"type": "source",
"version": "2.5.4.Final"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.4.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.4.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.4.0"
}
]
Setting up the pipeline
We would also use the below script to bootstrap few things as part of the setup. This inclues
Creating tables
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ python app.py create-table
Table 'employees' created successfully.
Populating data (optional)
We can insert data after setting up source and sink connectors as well.
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ python app.py insert-data
Inserted: 1, Alice, Engineer
Inserted: 2, Bob, Analyst
Inserted: 3, Charlie, Manager
Setting up source connector
Once we setup the connector, debezium starts to track any changes and push them to Kafka connect. This happens using replication slots in postgres and differes based on source systems.
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ python app.py register-connector
Connector registered successfully.
Verify that the connector has been registered
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ curl http://localhost:8083/connectors/postgres-connector/status -s | python -m json.tool
{
"name": "postgres-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.19.0.7:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.19.0.7:8083"
}
],
"type": "source"
}
Setting up sink connector
Finally, we setup the sink connector. This tells debezium to pull records from kafka and push them to elasticsearch. This is essentially a Kafka consumer.
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ python app.py register-sink-connector
Elasticsearch Sink Connector registered successfully.
|
|
Here, we need to note a few things
Line 40
specifiesREPLICA IDENTITY FULL
for the employees table once the table is created. This is needed when we want to capture full diff of the rows - irrespective of whether the primary key was changed or not.
Testing the pipeline
Now that we have our entire pipeline ready, we can start making some changes in the postgres database and see them follow through the other side in elasticsearch
Verify initial data
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ curl --location 'http://localhost:9200/cdc.public.employees/_count' -s | python -m json.tool
{
"count": 3,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
}
}
This shows the index count as 3. Matching our initual records that we added during the insert-data
stage of the pipeline setup
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ curl --location 'http://localhost:9200/cdc.public.employees/_search' -s | python -m json.tool
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "cdc.public.employees",
"_id": "cdc.public.employees+0+1",
"_score": 1.0,
"_source": {
"before": null,
"after": {
"id": 2,
"name": "Bob",
"role": "Analyst",
"created_at": 1733947180335591
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1733947265959,
"snapshot": "true",
"db": "cdc_db",
"sequence": "[null,\"26785888\"]",
"schema": "public",
"table": "employees",
"txId": 742,
"lsn": 26785888,
"xmin": null
},
"op": "r",
"ts_ms": 1733947266036,
"transaction": null
}
},
{
"_index": "cdc.public.employees",
"_id": "cdc.public.employees+0+2",
"_score": 1.0,
"_source": {
"before": null,
"after": {
"id": 3,
"name": "Charlie",
"role": "Manager",
"created_at": 1733947182347031
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1733947265959,
"snapshot": "last",
"db": "cdc_db",
"sequence": "[null,\"26785888\"]",
"schema": "public",
"table": "employees",
"txId": 742,
"lsn": 26785888,
"xmin": null
},
"op": "r",
"ts_ms": 1733947266036,
"transaction": null
}
},
{
"_index": "cdc.public.employees",
"_id": "cdc.public.employees+0+0",
"_score": 1.0,
"_source": {
"before": null,
"after": {
"id": 1,
"name": "Alice",
"role": "Engineer",
"created_at": 1733947178324531
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1733947265959,
"snapshot": "first",
"db": "cdc_db",
"sequence": "[null,\"26785888\"]",
"schema": "public",
"table": "employees",
"txId": 742,
"lsn": 26785888,
"xmin": null
},
"op": "r",
"ts_ms": 1733947266034,
"transaction": null
}
}
]
}
}
Here, the before field indicates that there was no data before this change and after this change, the data had id
, name
and role
fields.
Send some updates
Here, we generate a random number and update the role of each of the 3 employees
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ python app.py update-data
Updated role for 1 to Admin 82.
Updated role for 2 to Manager 54.
Updated role for 3 to Developer 55.
Verify changes replicated
Finally, we check the elasticsearch index count to see 3 new documents
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ curl --location 'http://localhost:9200/cdc.public.employees/_count' -s | python -m json.tool
{
"count": 6,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
}
}
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗
We could also see below that this time, the before
field is not null for the new 3 entries. It shows how each row changed as part of the update
(venv) ➜ cdc-pipeline-using-debezium git:(main) ✗ curl --location 'http://localhost:9200/cdc.public.employees/_search' -s | python -m json.tool
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 6,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "cdc.public.employees",
"_id": "cdc.public.employees+0+1",
"_score": 1.0,
"_source": {
"before": null,
"after": {
"id": 2,
"name": "Bob",
"role": "Analyst",
"created_at": 1733947180335591
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1733947265959,
"snapshot": "true",
"db": "cdc_db",
"sequence": "[null,\"26785888\"]",
"schema": "public",
"table": "employees",
"txId": 742,
"lsn": 26785888,
"xmin": null
},
"op": "r",
"ts_ms": 1733947266036,
"transaction": null
}
},
{
"_index": "cdc.public.employees",
"_id": "cdc.public.employees+0+2",
"_score": 1.0,
"_source": {
"before": null,
"after": {
"id": 3,
"name": "Charlie",
"role": "Manager",
"created_at": 1733947182347031
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1733947265959,
"snapshot": "last",
"db": "cdc_db",
"sequence": "[null,\"26785888\"]",
"schema": "public",
"table": "employees",
"txId": 742,
"lsn": 26785888,
"xmin": null
},
"op": "r",
"ts_ms": 1733947266036,
"transaction": null
}
},
{
"_index": "cdc.public.employees",
"_id": "cdc.public.employees+0+0",
"_score": 1.0,
"_source": {
"before": null,
"after": {
"id": 1,
"name": "Alice",
"role": "Engineer",
"created_at": 1733947178324531
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1733947265959,
"snapshot": "first",
"db": "cdc_db",
"sequence": "[null,\"26785888\"]",
"schema": "public",
"table": "employees",
"txId": 742,
"lsn": 26785888,
"xmin": null
},
"op": "r",
"ts_ms": 1733947266034,
"transaction": null
}
},
{
"_index": "cdc.public.employees",
"_id": "cdc.public.employees+0+4",
"_score": 1.0,
"_source": {
"before": {
"id": 2,
"name": "Bob",
"role": "Analyst",
"created_at": 1733947180335591
},
"after": {
"id": 2,
"name": "Bob",
"role": "Manager 54",
"created_at": 1733947180335591
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1733948856640,
"snapshot": "false",
"db": "cdc_db",
"sequence": "[null,\"26786616\"]",
"schema": "public",
"table": "employees",
"txId": 743,
"lsn": 26786616,
"xmin": null
},
"op": "u",
"ts_ms": 1733948856896,
"transaction": null
}
},
{
"_index": "cdc.public.employees",
"_id": "cdc.public.employees+0+5",
"_score": 1.0,
"_source": {
"before": {
"id": 3,
"name": "Charlie",
"role": "Manager",
"created_at": 1733947182347031
},
"after": {
"id": 3,
"name": "Charlie",
"role": "Developer 55",
"created_at": 1733947182347031
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1733948856640,
"snapshot": "false",
"db": "cdc_db",
"sequence": "[null,\"26786744\"]",
"schema": "public",
"table": "employees",
"txId": 743,
"lsn": 26786744,
"xmin": null
},
"op": "u",
"ts_ms": 1733948856896,
"transaction": null
}
},
{
"_index": "cdc.public.employees",
"_id": "cdc.public.employees+0+3",
"_score": 1.0,
"_source": {
"before": {
"id": 1,
"name": "Alice",
"role": "Engineer",
"created_at": 1733947178324531
},
"after": {
"id": 1,
"name": "Alice",
"role": "Admin 82",
"created_at": 1733947178324531
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1733948856640,
"snapshot": "false",
"db": "cdc_db",
"sequence": "[null,\"26786216\"]",
"schema": "public",
"table": "employees",
"txId": 743,
"lsn": 26786216,
"xmin": null
},
"op": "u",
"ts_ms": 1733948856895,
"transaction": null
}
}
]
}
}
Lookup data in Kibana (optional)
Finally, we can open up Kibana and lookup for an index with name cdc.public.employees
and find our data
Conclusion
Key Takeaways
- CDC pipelines provide real-time synchronization between databases and downstream systems.
- Debezium and Kafka make it easy to implement CDC pipelines.
- Elasticsearch and Kibana are powerful tools for indexing and visualization.