I am exploring to use Debezium
as my Change Data Capture
tool to introduce it into my team/project. I wanted to keep it simple, hence I chose to go with the embedded engine.
I wasn't able to find many examples of using MongoDB
as a source database (especially not with the embedded one). As a result, I needed to scout through the various documentation and blogs to figure out how to configure Spring Boot with Debezium (embedded) Engine
plus connecting to MongoDB
as the source, and Elasticsearch
as the destination.
Furthermore, I hope this will serve as a guide to others who face the same issue as me.
End Goal
What I wanted to achieve is pretty straightforward
host an embedded
Debezium
on top ofspring-boot
connect to mongo instance (with replicas) through
debezium-connector-mongodb
subscribe to the change events (
RecordChangeEvent
)index into
Elasticsearch
Setup Project
Prerequisite
MongoDB Server
- Local or Cloud but requires having replica set
Elasticsearch Server
- Local or Cloud
I have trouble configuring MongoDB
with a replica set for local setup, I will be using a free instance from MongoDB Atlas
. However, for Elasticsearch
is straightforward to configure a local instance. I spin it up via the docker-compose script.
It is a bumper that MongoDB Atlas
is offering only version 5.x for free instance, and not the latest and greatest version as a default. This means that I am not able to fully test it with the Change Streams with Document Pre- and Post-Images feature that is only available on version 6 onwards.
Initialize a spring boot project
Go to start.spring.io and generate your spring boot project. Otherwise, click on this link for the preset that I've configured.
While it is not necessary to bring in spring-boot-starter-web
, it is an easy way to bring in the necessary dependencies. We don't need it to run as a web application
, so we can turn it off by adding the configuration below
spring:
main:
web-application-type: none
Note that at the time of writing, while Spring Boot 3.x
is available, I am still using Spring Boot 2.7.9
for my setup.
Add Debezium dependencies
Following the documentation stated in Debezium, we need to add the following dependencies.
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
At the time of writing, I am using
2.1.2.Final
for Debezium
As I am connecting to MongoDB
as my source, I needed to add the connector dependency as well.
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>${version.debezium}</version>
</dependency>
Override Mongo Version
If you are also using spring-boot 2.7.x
, then make sure you override the mongo-driver version
to at least 4.7.1
which Debezium 2.1.2-Final
is depending on. To override, simply add mongodb.version
under properties
section in pom.xml
<properties>
<!--
Spring Boot 2.7.9 brings in mongo 4.6.1 but debezium relies on 4.7.1
Manually define to prevent debezium from using 4.6.1
-->
<mongodb.version>4.7.1</mongodb.version>
<debezium.version>2.1.2.Final</debezium.version>
</properties>
Initially, I thought there was a bug on Debezium
but didn't realize that it was being overwritten. Don't make the same rookie mistake as me.
Debezium Configuration
As per my understanding, there are two types of property; Engine and Connector.
Engine
These properties are required to be set up no matter which connector you choose to use with Debezium
.
Connector
These properties are specific to the database you are connecting to, so do refer to the respective documentation on what is required to set.
There is a huge list of properties that can be set, but I am only configuring the minimal to get it to work. Several defaults are quite sensible, and you don't have to overwrite if you don't have to.
import java.io.File;
import java.io.IOException;
import org.springframework.context.annotation.Bean;
import io.debezium.config.Configuration;
@org.springframework.context.annotation.Configuration
public class DebeziumConnectorConfig {
// based off https://debezium.io/documentation/reference/2.1/connectors/mongodb.html#mongodb-connector-properties
@Bean
public Configuration mongodbConnector() throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
return Configuration.create()
// engine properties
.with("name", "sbd-mongodb")
.with("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
// connector specific properties
.with("mongodb.connection.string", "mongodb+srv://sb-debezium.1ewsyzd.mongodb.net")
.with("topic.prefix", "sbd-mongodb-connector")
.with("mongodb.user", "bwgjoseph")
.with("mongodb.password", "<redacted>")
.with("mongodb.ssl.enabled", "true") // default false
.with("database.include.list", "source") // default empty
.with("snapshot.delay.ms", "100")
.with("errors.log.include.messages", "true")
.build();
}
}
A few points to note:
name
has to be unique across the connectorsmongodb.hosts
is not required ifmongodb.connection.string
is configuredoffset.storage
is necessary to persist offset, for this purpose, it is alright to be created in a temporary folder, but it should be created and stored in a persistent volumemongodb.ssl.enabled
must be set totrue
if connecting toMongoDB Atlas
Setup Debezium Service
Now that we have everything set up, it is time to configure the actual service to start and listen for changes.
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Service;
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class DebeziumSourceEventListener {
private final Executor executor;
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumSourceEventListener(Configuration mongodbConnector) {
this.executor = Executors.newSingleThreadExecutor();
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(mongodbConnector.asProperties())
.notifying(this::handleChangeEvent)
.build();
}
private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
Struct sourceRecordKey = (Struct) sourceRecord.key();
Struct sourceRecordValue = (Struct) sourceRecord.value();
log.info("Key = '" + sourceRecordKey + "' value = '" + sourceRecordValue + "'");
}
@PostConstruct
private void start() {
this.executor.execute(debeziumEngine);
}
@PreDestroy
private void stop() throws IOException {
if (this.debeziumEngine != null) {
this.debeziumEngine.close();
}
}
}
A quick demo
Let's imagine you have another service (endpoint) that creates a Person
and save into MongoDB
import java.util.List;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.TypeAlias;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
@Getter
@ToString
@Builder(toBuilder = true)
@TypeAlias(value = "person")
@Document(collection = "persons")
public class Person {
@Id
private String id;
private String name;
private String description;
private List<String> hashTags;
}
Start
We will now start our service
./mvnw spring-boot:run
Create
When a new document is created
The following logs will be shown
[1] 2023-03-12 15:22:05.819 INFO 281440 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask : 1 records sent during previous 00:01:50.619, last recorded offset of {rs=atlas-b15fi5-shard-0, server_id=sbd-mongodb-connector} partition is {sec=1678605725, ord=4, transaction_id=null, resume_token=82640D7D9D000000042B022C0100296E5A1004CCDB188FE00C4DF2852FF333EE031E5646645F69640064640D7D9DC625D06F305A0E930004}
[2] 2023-03-12 15:22:05.819 INFO 281440 --- [pool-1-thread-1] c.b.s.DebeziumSourceEventListener : Key = 'Struct{id={"$oid": "640d7d9dc625d06f305a0e93"}}' value = 'Struct{after={"_id": {"$oid": "640d7d9dc625d06f305a0e93"},"name": "joseph","description": "hello world","hashTags": ["hello","world"]},source=Struct{version=2.1.2.Final,connector=mongodb,name=sbd-mongodb-connector,ts_ms=1678605725000,db=source,rs=atlas-b15fi5-shard-0,collection=persons,ord=4},op=c,ts_ms=1678605725586}'
Indicates the resume token information
Created event
key/value
pair
Update
Let's try to update the name
field, once updated, the following logs will be shown
[1] 2023-03-12 15:26:43.407 INFO 283436 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask : 1 records sent during previous 00:00:23.096, last recorded offset of {rs=atlas-b15fi5-shard-0, server_id=sbd-mongodb-connector} partition is {sec=1678606003, ord=5, transaction_id=null, resume_token=82640D7EB3000000052B022C0100296E5A1004CCDB188FE00C4DF2852FF333EE031E5646645F69640064640D7D9DC625D06F305A0E930004}
[2] 2023-03-12 15:26:43.410 INFO 283436 --- [pool-1-thread-1] c.b.s.DebeziumSourceEventListener : Key = 'Struct{id={"$oid": "640d7d9dc625d06f305a0e93"}}' value = 'Struct{after={"_id": {"$oid": "640d7d9dc625d06f305a0e93"},"name": "bwgjoseph","description": "hello world","hashTags": ["hello","world"]},updateDescription=Struct{updatedFields={"name": "bwgjoseph"}},source=Struct{version=2.1.2.Final,connector=mongodb,name=sbd-mongodb-connector,ts_ms=1678606003000,db=source,rs=atlas-b15fi5-shard-0,collection=persons,ord=5},op=u,ts_ms=1678606003351}'
Indicates the resume token information
Updated event
key/value
pair- where we can see the name is now
bwgjoseph
(changed fromjoseph
)
- where we can see the name is now
Delete
When the document is deleted
, it will receive 2 events
[1] 2023-03-12 15:27:24.555 INFO 283436 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask : 2 records sent during previous 00:00:41.148, last recorded offset of {rs=atlas-b15fi5-shard-0, server_id=sbd-mongodb-connector} partition is {sec=1678606044, ord=3, transaction_id=null, resume_token=82640D7EDC000000032B022C0100296E5A1004CCDB188FE00C4DF2852FF333EE031E5646645F69640064640D7D9DC625D06F305A0E930004}
[2] 2023-03-12 15:27:24.557 INFO 283436 --- [pool-1-thread-1] c.b.s.DebeziumSourceEventListener : Key = 'Struct{id={"$oid": "640d7d9dc625d06f305a0e93"}}' value = 'Struct{source=Struct{version=2.1.2.Final,connector=mongodb,name=sbd-mongodb-connector,ts_ms=1678606044000,db=source,rs=atlas-b15fi5-shard-0,collection=persons,ord=3},op=d,ts_ms=1678606044425}'
[3] 2023-03-12 15:27:24.559 INFO 283436 --- [pool-1-thread-1] c.b.s.DebeziumSourceEventListener : Key = 'Struct{id={"$oid": "640d7d9dc625d06f305a0e93"}}' value = 'null'
Indicates the resume token information
Deleted Event 1
key/value
pair withvalue
Deleted Event 2
key/value
pair withoutvalue
Why do we receive 2 events for deletion? I'm not sure, but it is not so useful for my use case since I would need the value
of the source and op
information to do something meaningful.
Conclusion
It wasn't as difficult as I thought to get it started, but it did take me a day of googling around, reading the documentation, and understanding the concepts behind Debezium. I think I'm only scratching the surface though, but it's a start!
I think using the embedded server is a good starting point to determine if this works for you or your team without worrying about setting the entire infrastructure (i.e. Kafka, Kafka Connect) and yet achieving the same outcome.
When you are sure of diving into it, or when you need to scale, then maybe that is the time to have the proper infrastructure to support
In my next blog post, I will be writing about how to decode the event message. Stay tuned!
Source Code
As usual, the full source code is available on GitHub