Configure Content-Based Routing to Kafka Topics with YugabyteDB’s Change Data Capture
In today’s data-driven world, companies are constantly looking for ways to process and analyze data in real time to gain insights and make informed decisions. However, as data volumes continue to grow and compliance requirements increase, efficiently managing and routing data to the appropriate systems and applications has become a daunting task..
As a result, content-based routing has become a key messaging pattern. With content-based routing, messages are selectively routed based on their content, providing precise control over message routing. This is extremely useful in situations where messages need to be routed to different destinations based on their content. This is especially true when multiple consumers need to process different subsets of the data in the topic.
This blog post will guide you on configuring content-based routing for the YugabyteDB Change Data Capture (CDC) connector. Let’s walk through an example to show how content-based routing can help with GDPR compliance that mandate geolocating data based on a user’s locations. In the example, we will show you how to reroute database events from users whose country is set to a specific value to a different Kafka Topic that has a partition in that country.
YugabyteDB Change Data Capture and Kafka Topics
To demonstrate content-based routing, we’ll use two key technologies. Apache Kafka will be our streaming platform, and YugabyteDB will be our core relational database. For those not familiar with these products here’s a quick review as well as key details on them as it relates to setting up with a streaming platform and change data capture.
- YugabyteDB is an open source, distributed SQL database ideally built for distributed and scalable transactional applications. Change data capture (CDC) in YugabyteDB ensures that any changes in data due to operations such as inserts, updates, and deletions are identified, captured, and automatically applied to another data repository instance, or made available for consumption by applications and other tools. To consume the events generated by CDC, Debezium is used as the connector. By default, a connector created on the YugabyteDB Debezium CDC Connector streams all change data capture events from a table into a single Kafka Topic. Later we’ll see how you can push or reroute events to other Kafka Topics based on that event’s content (i.e. content-based routing).
- Apache Kafka is a powerful open-source distributed streaming platform that excels at handling high volumes of data in real-time. It provides reliable, scalable, and high-performance data processing for mission-critical applications. To efficiently manage data streams between Kafka Topics, the Confluent Platform offers a range of powerful tools and connectors. The YugabyteDB Source Connector will be utilized to move data from YugabyteDB to Kafka. This connector streamlines the process and helps ensure seamless data transfer. Confluent Platform helps you operate efficiently at scale, making it an ideal solution for this use case illustration.
Follow the links above for more details, but now let’s look at how we can bring these key pieces together to build a content-based routing system.
How to Set Up Content-Based Routing
Step 1: Include necessary dependencies in the CDC connector
There are some dependencies that are required for content-based routing to work. These are not included in the official yugabyte-debezium-connector for security reasons. In particular, these dependencies are
- Debezium routing SMT (Single Message Transform)
- Groovy JSR223 implementation (or GraalVM JavaScript JSR 223 implementation)
To include these dependencies in our Kafka-connect environment, we need to rebuild the yugabyte-debezium-connector image with both the plugins mentioned above.
To get started, use the Dockerfile and docker-compose configuration in our Github cdc-examples repository.
Here’s what the Dockerfile would look like:
FROM quay.io/yugabyte/debezium-connector:latest # Add the required jar files for content based routing RUN cd $KAFKA_CONNECT_YB_DIR && curl -so debezium-scripting-2.1.2.Final.jar https://repo1.maven.org/maven2/io/debezium/debezium-scripting/2.1.2.Final/debezium-scripting-2.1.2.Final.jar RUN cd $KAFKA_CONNECT_YB_DIR && curl -so groovy-4.0.9.jar https://repo1.maven.org/maven2/org/apache/groovy/groovy/4.0.9/groovy-4.0.9.jar RUN cd $KAFKA_CONNECT_YB_DIR && curl -so groovy-jsr223-4.0.9.jar https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/4.0.9/groovy-jsr223-4.0.9.jar
Step 2: Setup the CDC environment
Set up your CDC environment by deploying and configuring Kafka and the yugabyte-debezium-connector, etc. Use the Docker image that was built in Step 1 to ensure content-based routing will work.
Now, create a CDC stream with “before image” enabled. Use the command below.
./yb-admin --master_addresses <master-addresses> create_change_data_stream ysql.<namespace> IMPLICIT ALL
Step 3: Deploy a connector with content-based routing enabled
See below for an example of how a connector configuration that has content-based routing enabled would look. In this example, all the event rows with country set to “UK”, “India” and “USA” on the table “public.users” will be routed to different Kafka Topics called uk_users, india_users and usa_users respectively.
All the other events will still be streamed to the default “ybconnector.public.users” topic.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "ybconnector", "config": { "tasks.max":"2", "connector.class": "io.debezium.connector.yugabytedb.YugabyteDBConnector", "database.hostname":"'$NODE'", "database.master.addresses":"'$MASTERS'", "database.port":"5433", "database.user": "yugabyte", "database.password":"Yugabyte@123", "database.dbname":"yugabyte", "database.server.name":"ybconnector", "snapshot.mode":"initial", "database.streamid":"'$1'", "table.include.list":"public.users", "key.converter":"io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url":"https://schema-registry:8081", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"https://schema-registry:8081", "transforms":"route1,route2,route3", "transforms.route1.type":"io.debezium.transforms.ContentBasedRouter", "transforms.route1.language":"jsr223.groovy", "transforms.route1.topic.expression":"value.after != null ? (value.after?.country?.value == '\''UK'\'' ? '\''uk_users'\'' : null) : (value.before?.country?.value == '\''UK'\'' ? '\''uk_users'\'' : null)", "transforms.route2.type":"io.debezium.transforms.ContentBasedRouter", "transforms.route2.language":"jsr223.groovy", "transforms.route2.topic.expression":"value.after != null ? (value.after?.country?.value == '\''India'\'' ? '\''india_users'\'' : null) : (value.before?.country?.value == '\''India'\'' ? '\''india_users'\'' : null)", "transforms.route3.type":"io.debezium.transforms.ContentBasedRouter", "transforms.route3.language":"jsr223.groovy", "transforms.route3.topic.expression":"value.after != null ? (value.after?.country?.value == '\''USA'\'' ? '\''usa_users'\'' : null) : (value.before?.country?.value == '\''USA'\'' ? '\''usa_users'\'' : null)" } }'
The configuration that controls content re-routing
Look at the value of “transforms.route1.topic.expression” in the code above.
value.after != null ? (value.after?.country?.value == '\''UK'\'' ? '\''uk_users'\'' : null) : (value.before?.country?.value == '\''UK'\'' ? '\''uk_users'\'' : null)"
This expression checks if the value of the row after the operation has the country set to “UK.” If ‘yes’ then the expression returns “uk_users.” If “no”, it returns “null,” and no value appears after the operation (for example, in a “delete” operation), the expression also checks for the same condition on row values before the operation.
The value that is returned determines which new Kafka Topic will receive the re-routed event. If it returns “null”, the event is sent to the default topic.
NOTE: In this example, we rerouted the events for three countries (i.e. UK, India and USA). We could have achieved this using a single expression, but for the sake of illustration and readability we created three different routers. For more advanced routing configuration, you can refer to Debezium’s official documentation on content-based routing.
Step 4: Verify the message routing
You can verify the message routing and connector deployment by performing CRUD operations on the “users” table. You should be able to see new Kafka Topics created specifically for the users who have their country set to “UK”, “India” or “USA”.
Conclusion
In this post, we walked through the process of rerouting change data capture (CDC) events from a table to different Kafka Topics based on the event content.