Spring Boot Debezium (embedded) Engine with MongoDB as Source - Part 1

Spring Boot Debezium (embedded) Engine with MongoDB as Source - Part 1

I am exploring to use Debezium as my Change Data Capture tool to introduce it into my team/project. I wanted to keep it simple, hence I chose to go with the embedded engine.

I wasn't able to find many examples of using MongoDB as a source database (especially not with the embedded one). As a result, I needed to scout through the various documentation and blogs to figure out how to configure Spring Boot with Debezium (embedded) Engine plus connecting to MongoDB as the source, and Elasticsearch as the destination.

Furthermore, I hope this will serve as a guide to others who face the same issue as me.

End Goal

What I wanted to achieve is pretty straightforward

  • host an embedded Debezium on top of spring-boot

  • connect to mongo instance (with replicas) through debezium-connector-mongodb

  • subscribe to the change events (RecordChangeEvent)

  • index into Elasticsearch

Setup Project

Prerequisite

  • MongoDB Server

  • Elasticsearch Server

    • Local or Cloud

I have trouble configuring MongoDB with a replica set for local setup, I will be using a free instance from MongoDB Atlas. However, for Elasticsearch is straightforward to configure a local instance. I spin it up via the docker-compose script.

It is a bumper that MongoDB Atlas is offering only version 5.x for free instance, and not the latest and greatest version as a default. This means that I am not able to fully test it with the Change Streams with Document Pre- and Post-Images feature that is only available on version 6 onwards.

Initialize a spring boot project

Go to start.spring.io and generate your spring boot project. Otherwise, click on this link for the preset that I've configured.

While it is not necessary to bring in spring-boot-starter-web, it is an easy way to bring in the necessary dependencies. We don't need it to run as a web application, so we can turn it off by adding the configuration below

spring:
  main:
    web-application-type: none

Note that at the time of writing, while Spring Boot 3.x is available, I am still using Spring Boot 2.7.9 for my setup.

Add Debezium dependencies

Following the documentation stated in Debezium, we need to add the following dependencies.

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

At the time of writing, I am using 2.1.2.Final for Debezium

As I am connecting to MongoDB as my source, I needed to add the connector dependency as well.

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mongodb</artifactId>
    <version>${version.debezium}</version>
</dependency>

Override Mongo Version

If you are also using spring-boot 2.7.x, then make sure you override the mongo-driver version to at least 4.7.1 which Debezium 2.1.2-Final is depending on. To override, simply add mongodb.version under properties section in pom.xml

<properties>
    <!--
        Spring Boot 2.7.9 brings in mongo 4.6.1 but debezium relies on 4.7.1
        Manually define to prevent debezium from using 4.6.1
    -->
    <mongodb.version>4.7.1</mongodb.version>
    <debezium.version>2.1.2.Final</debezium.version>
</properties>

Initially, I thought there was a bug on Debezium but didn't realize that it was being overwritten. Don't make the same rookie mistake as me.

Debezium Configuration

As per my understanding, there are two types of property; Engine and Connector.

Engine

These properties are required to be set up no matter which connector you choose to use with Debezium.

Connector

These properties are specific to the database you are connecting to, so do refer to the respective documentation on what is required to set.

There is a huge list of properties that can be set, but I am only configuring the minimal to get it to work. Several defaults are quite sensible, and you don't have to overwrite if you don't have to.

import java.io.File;
import java.io.IOException;
import org.springframework.context.annotation.Bean;
import io.debezium.config.Configuration;

@org.springframework.context.annotation.Configuration
public class DebeziumConnectorConfig {
    // based off https://debezium.io/documentation/reference/2.1/connectors/mongodb.html#mongodb-connector-properties
    @Bean
    public Configuration mongodbConnector() throws IOException {
        File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");

        return Configuration.create()
                // engine properties
                .with("name", "sbd-mongodb")
                .with("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
                .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
                .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
                .with("offset.flush.interval.ms", "60000")
                // connector specific properties
                .with("mongodb.connection.string", "mongodb+srv://sb-debezium.1ewsyzd.mongodb.net")
                .with("topic.prefix", "sbd-mongodb-connector")
                .with("mongodb.user", "bwgjoseph")
                .with("mongodb.password", "<redacted>")
                .with("mongodb.ssl.enabled", "true") // default false
                .with("database.include.list", "source") // default empty
                .with("snapshot.delay.ms", "100")
                .with("errors.log.include.messages", "true")
                .build();
    }
}

A few points to note:

  • name has to be unique across the connectors

  • mongodb.hosts is not required if mongodb.connection.string is configured

  • offset.storage is necessary to persist offset, for this purpose, it is alright to be created in a temporary folder, but it should be created and stored in a persistent volume

  • mongodb.ssl.enabled must be set to true if connecting to MongoDB Atlas

Setup Debezium Service

Now that we have everything set up, it is time to configure the actual service to start and listen for changes.

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Service;

import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class DebeziumSourceEventListener {

    private final Executor executor;
    private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

    public DebeziumSourceEventListener(Configuration mongodbConnector) {
        this.executor = Executors.newSingleThreadExecutor();
        this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
            .using(mongodbConnector.asProperties())
            .notifying(this::handleChangeEvent)
            .build();
    }

    private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
        SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
        Struct sourceRecordKey = (Struct) sourceRecord.key();
        Struct sourceRecordValue = (Struct) sourceRecord.value();

        log.info("Key = '" + sourceRecordKey + "' value = '" + sourceRecordValue + "'");
    }

    @PostConstruct
    private void start() {
        this.executor.execute(debeziumEngine);
    }

    @PreDestroy
    private void stop() throws IOException {
        if (this.debeziumEngine != null) {
            this.debeziumEngine.close();
        }
    }
}

A quick demo

Let's imagine you have another service (endpoint) that creates a Person and save into MongoDB

import java.util.List;

import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.TypeAlias;
import org.springframework.data.mongodb.core.mapping.Document;

import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

@Getter
@ToString
@Builder(toBuilder = true)
@TypeAlias(value = "person")
@Document(collection = "persons")
public class Person {
    @Id
    private String id;
    private String name;
    private String description;
    private List<String> hashTags;
}

Start

We will now start our service

./mvnw spring-boot:run

Create

When a new document is created

The following logs will be shown

[1] 2023-03-12 15:22:05.819  INFO 281440 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask      : 1 records sent during previous 00:01:50.619, last recorded offset of {rs=atlas-b15fi5-shard-0, server_id=sbd-mongodb-connector} partition is {sec=1678605725, ord=4, transaction_id=null, resume_token=82640D7D9D000000042B022C0100296E5A1004CCDB188FE00C4DF2852FF333EE031E5646645F69640064640D7D9DC625D06F305A0E930004}
[2] 2023-03-12 15:22:05.819  INFO 281440 --- [pool-1-thread-1] c.b.s.DebeziumSourceEventListener        : Key = 'Struct{id={"$oid": "640d7d9dc625d06f305a0e93"}}' value = 'Struct{after={"_id": {"$oid": "640d7d9dc625d06f305a0e93"},"name": "joseph","description": "hello world","hashTags": ["hello","world"]},source=Struct{version=2.1.2.Final,connector=mongodb,name=sbd-mongodb-connector,ts_ms=1678605725000,db=source,rs=atlas-b15fi5-shard-0,collection=persons,ord=4},op=c,ts_ms=1678605725586}'
  1. Indicates the resume token information

  2. Created event key/value pair

Update

Let's try to update the name field, once updated, the following logs will be shown

[1] 2023-03-12 15:26:43.407  INFO 283436 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask      : 1 records sent during previous 00:00:23.096, last recorded offset of {rs=atlas-b15fi5-shard-0, server_id=sbd-mongodb-connector} partition is {sec=1678606003, ord=5, transaction_id=null, resume_token=82640D7EB3000000052B022C0100296E5A1004CCDB188FE00C4DF2852FF333EE031E5646645F69640064640D7D9DC625D06F305A0E930004}
[2] 2023-03-12 15:26:43.410  INFO 283436 --- [pool-1-thread-1] c.b.s.DebeziumSourceEventListener        : Key = 'Struct{id={"$oid": "640d7d9dc625d06f305a0e93"}}' value = 'Struct{after={"_id": {"$oid": "640d7d9dc625d06f305a0e93"},"name": "bwgjoseph","description": "hello world","hashTags": ["hello","world"]},updateDescription=Struct{updatedFields={"name": "bwgjoseph"}},source=Struct{version=2.1.2.Final,connector=mongodb,name=sbd-mongodb-connector,ts_ms=1678606003000,db=source,rs=atlas-b15fi5-shard-0,collection=persons,ord=5},op=u,ts_ms=1678606003351}'
  1. Indicates the resume token information

  2. Updated event key/value pair

    1. where we can see the name is now bwgjoseph (changed from joseph)

Delete

When the document is deleted, it will receive 2 events

[1] 2023-03-12 15:27:24.555  INFO 283436 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask      : 2 records sent during previous 00:00:41.148, last recorded offset of {rs=atlas-b15fi5-shard-0, server_id=sbd-mongodb-connector} partition is {sec=1678606044, ord=3, transaction_id=null, resume_token=82640D7EDC000000032B022C0100296E5A1004CCDB188FE00C4DF2852FF333EE031E5646645F69640064640D7D9DC625D06F305A0E930004}
[2] 2023-03-12 15:27:24.557  INFO 283436 --- [pool-1-thread-1] c.b.s.DebeziumSourceEventListener        : Key = 'Struct{id={"$oid": "640d7d9dc625d06f305a0e93"}}' value = 'Struct{source=Struct{version=2.1.2.Final,connector=mongodb,name=sbd-mongodb-connector,ts_ms=1678606044000,db=source,rs=atlas-b15fi5-shard-0,collection=persons,ord=3},op=d,ts_ms=1678606044425}'   
[3] 2023-03-12 15:27:24.559  INFO 283436 --- [pool-1-thread-1] c.b.s.DebeziumSourceEventListener        : Key = 'Struct{id={"$oid": "640d7d9dc625d06f305a0e93"}}' value = 'null'
  1. Indicates the resume token information

  2. Deleted Event 1 key/value pair with value

  3. Deleted Event 2 key/value pair without value

Why do we receive 2 events for deletion? I'm not sure, but it is not so useful for my use case since I would need the value of the source and op information to do something meaningful.

Conclusion

It wasn't as difficult as I thought to get it started, but it did take me a day of googling around, reading the documentation, and understanding the concepts behind Debezium. I think I'm only scratching the surface though, but it's a start!

I think using the embedded server is a good starting point to determine if this works for you or your team without worrying about setting the entire infrastructure (i.e. Kafka, Kafka Connect) and yet achieving the same outcome.

When you are sure of diving into it, or when you need to scale, then maybe that is the time to have the proper infrastructure to support

In my next blog post, I will be writing about how to decode the event message. Stay tuned!

Source Code

As usual, the full source code is available on GitHub

References