Configuring Change Data Capture (CDC) Mode
Overview
DataHub supports CDC (Change Data Capture) mode for generating MetadataChangeLogs (MCLs). In CDC mode, MCLs are generated from database change events rather than being produced directly by GMS. This provides stronger ordering guarantees - MCLs are generated in the exact order of database transaction commits. Use the CDC mode if you need strict ordering guarantees for metadata changes
The default deployments do not use CDC. Explicit configuration is required to switch using the CDC mode.
Prerequisites
Before enabling CDC mode, ensure you have:
- Supported Database: MySQL 5.7+ or PostgreSQL 10+
- Kafka Connect: Running Kafka Connect with Debezium connector plugin installed
- Database Permissions: Ability to enable replication and create CDC users
- Kafka Resources: Sufficient resources for additional CDC topic
Architecture
In CDC mode:
MCP → GMS (writes to DB, no MCL emission)
↓
Database Change
↓
Debezium Connector (captures changes)
↓
CDC Topic (datahub.metadata_aspect_v2)
↓
MCE Consumer (reads CDC events, generates MCLs via EntityService)
↓
MCL Topics (MetadataChangeLog_Versioned_v1)
Configuration Overview
CDC mode requires configuration in three layers:
- Database Layer: Enable replication, create CDC user, configure tables
- Debezium Layer: Configure Kafka Connect connector (auto or manual)
- DataHub Layer: Enable CDC processing in GMS and MCE Consumer
Database Configuration
For development environments: DataHub can automate database setup via the system-update service when CDC_CONFIGURE_SOURCE=true
. The system-update service will create the necessary CDC user and configure database settings during startup.
For production environments: Manual database configuration is recommended for better control and security practices specific to your environment.
MySQL Requirements
Your MySQL instance must be configured to support CDC:
Enable Binary Logging: Configure MySQL to use ROW-based binary logging with FULL row images. This allows Debezium to capture all column changes.
Create CDC User: Create a dedicated database user for CDC with the following capabilities:
- Read access to the DataHub database (specifically the
metadata_aspect_v2
table) - Replication client privileges to read binary logs
- Replication slave privileges to monitor replication status
- Database introspection privileges to discover schema changes
- Read access to the DataHub database (specifically the
Configure Binary Log Retention: Set appropriate binary log retention based on your operational needs and disaster recovery requirements. Consider factors like Debezium downtime tolerance and disk space.
Server ID: Ensure each MySQL instance (including the CDC connector) has a unique server ID for replication topology.
What DataHub automates (development mode):
- Creates CDC user with required permissions on the local database
- Validates binary logging configuration
- Configures Debezium connector
PostgreSQL Requirements
Your PostgreSQL instance must be configured to support logical replication:
Enable Logical Replication: Configure PostgreSQL WAL (Write-Ahead Log) level to 'logical'. This is required for CDC to capture row-level changes.
Configure Replication Slots: Allocate sufficient replication slots (at least one per CDC connector). Replication slots track the CDC consumer's position in the WAL.
Create CDC User: Create a dedicated database user for CDC with the following capabilities:
- Replication privileges to create and read from replication slots
- Connect privileges on the DataHub database
- Select privileges on the
metadata_aspect_v2
table in the public schema
Set Replica Identity to FULL: Configure the
metadata_aspect_v2
table to use FULL replica identity. This ensures Debezium receives complete before/after images of changed rows, which is critical for DataHub's MCL generation.Create Publication: Create a PostgreSQL publication named
dbz_publication
for themetadata_aspect_v2
table. Publications define which tables participate in logical replication. DataHub expects this specific publication name when auto-configuring.Create Replication Slot: When manually configuring, create a replication slot named
debezium
using thepgoutput
plugin. DataHub's default configuration uses this slot name.Configure WAL Senders: Ensure sufficient
max_wal_senders
for the number of CDC connectors and any other replication consumers.
What DataHub automates (development mode):
- Creates CDC user with required permissions
- Sets replica identity to FULL on metadata_aspect_v2 table
- Creates publication named
dbz_publication
for the table - Creates replication slot named
debezium
- Validates logical replication configuration
- Configures Debezium connector with correct publication and slot names
Important PostgreSQL Notes:
- Replication slots prevent WAL cleanup. Monitor disk space and configure
max_slot_wal_keep_size
to prevent runaway growth. - When a CDC connector is stopped, consider dropping its replication slot to allow WAL cleanup.
- Database restarts may be required to change WAL-related configuration parameters.
- The publication name
dbz_publication
and slot namedebezium
are DataHub's defaults - if you use different names, update your Debezium connector configuration accordingly.
Debezium Configuration
You have two options for configuring Debezium:
Option 1: Auto-Configuration (Development/Testing)
For development and testing environments, DataHub can automatically configure Debezium via the system-update service.
Environment Variables:
# Enable CDC processing
CDC_MCL_PROCESSING_ENABLED=true
# Enable auto-configuration
CDC_CONFIGURE_SOURCE=true
# Database type (mysql or postgres)
CDC_DB_TYPE=mysql
# CDC user credentials (will be created if doesn't exist)
CDC_USER=datahub_cdc
CDC_PASSWORD=your_secure_password
# Kafka Connect endpoint
CDC_KAFKA_CONNECT_URL=http://kafka-connect:8083
# Connector name
DATAHUB_CDC_CONNECTOR_NAME=datahub-cdc-connector
# CDC topic name
CDC_TOPIC_NAME=datahub.metadata_aspect_v2
Docker Compose Example:
datahub-gms:
environment:
- CDC_MCL_PROCESSING_ENABLED=true
mce-consumer:
environment:
- CDC_MCL_PROCESSING_ENABLED=true
datahub-upgrade:
environment:
- CDC_MCL_PROCESSING_ENABLED=true
- CDC_CONFIGURE_SOURCE=true
- CDC_DB_TYPE=mysql # or postgres
- CDC_USER=datahub_cdc
- CDC_PASSWORD=your_secure_password
- CDC_KAFKA_CONNECT_URL=http://kafka-connect:8083
Option 2: Manual Configuration (Production)
For production environments, manually configure Debezium Kafka Connect for better control.
Key Configuration Requirements:
Regardless of database type, your Debezium connector must be configured with the following critical settings:
Table to Capture: The connector must capture changes from the
metadata_aspect_v2
table only. This is the primary storage table for DataHub's versioned aspects.Output Topic Name: CDC events must be routed to a single consolidated topic (default:
datahub.metadata_aspect_v2
). Use Debezium's routing transformation to override the default per-table topic naming.Message Format:
- Key Converter: Use
StringConverter
- keys will be entity URNs as plain strings - Value Converter: Use
JsonConverter
- CDC payloads must be in JSON format - Schemas Disabled: Set
value.converter.schemas.enable: false
- DataHub validates aspects internally and doesn't use Kafka schema registry for CDC events
- Key Converter: Use
Partitioning Key: Messages must be partitioned by the
urn
column. This ensures all changes for a given entity are processed in order by routing them to the same Kafka partition.
Example MySQL Connector Configuration
The following is an example configuration for MySQL similar to what datahub auto-configures for development/test
{
"name": "datahub-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "datahub_cdc",
"database.password": "your_secure_password",
"database.server.id": "184001",
"database.server.name": "datahub",
"database.include.list": "datahub",
"table.include.list": "datahub.metadata_aspect_v2",
"database.history.kafka.bootstrap.servers": "broker:29092",
"database.history.kafka.topic": "datahub.schema-changes",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"message.key.columns": "datahub.metadata_aspect_v2:urn",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*metadata_aspect_v2",
"transforms.route.replacement": "datahub.metadata_aspect_v2",
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"decimal.handling.mode": "string",
"bigint.unsigned.handling.mode": "long"
}
}
PostgreSQL Connector Configuration
The following is an example configuration for Postgres similar to what datahub auto-configures for development/test
{
"name": "datahub-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "datahub_cdc",
"database.password": "your_secure_password",
"database.dbname": "datahub",
"database.server.name": "datahub",
"plugin.name": "pgoutput",
"publication.name": "dbz_publication",
"publication.autocreate.mode": "disabled",
"slot.name": "debezium",
"schema.include.list": "public",
"table.include.list": "public.metadata_aspect_v2",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"message.key.columns": "public.metadata_aspect_v2:urn",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*metadata_aspect_v2",
"transforms.route.replacement": "datahub.metadata_aspect_v2",
"snapshot.mode": "initial",
"decimal.handling.mode": "string"
}
}
Deploy Connector
Environment Variables for Manual Setup:
# Enable CDC processing
CDC_MCL_PROCESSING_ENABLED=true
# Disable auto-configuration
CDC_CONFIGURE_SOURCE=false
# CDC topic name (must match connector config)
CDC_TOPIC_NAME=datahub.metadata_aspect_v2
DataHub Configuration
Once database and Debezium are configured, enable CDC mode in DataHub components:
Docker Compose Configuration
datahub-gms:
environment:
# Enable CDC mode (disables direct MCL emission from GMS)
- CDC_MCL_PROCESSING_ENABLED=true
mce-consumer:
environment:
# Enable CDC mode (enables CDC message processing)
- CDC_MCL_PROCESSING_ENABLED=true
- CDC_TOPIC_NAME=datahub.metadata_aspect_v2
datahub-upgrade:
environment:
- CDC_MCL_PROCESSING_ENABLED=true
- CDC_CONFIGURE_SOURCE=false # indicates manually configured Debezium
- CDC_DB_TYPE=mysql # or postgres
- CDC_USER=datahub_cdc
- CDC_PASSWORD=your_secure_password
Verification
1. Verify Debezium Connector
# Check connector status
curl http://kafka-connect:8083/connectors/datahub-cdc-connector/status | jq
# Expected output shows "state": "RUNNING"
2. Verify CDC Topic
# List topics (should see datahub.metadata_aspect_v2)
kafka-topics --bootstrap-server broker:9092 --list | grep metadata_aspect_v2
# Consume CDC events
kafka-console-consumer --bootstrap-server broker:9092 \
--topic datahub.metadata_aspect_v2 \
--from-beginning --max-messages 10
3. Test End-to-End
Make a metadata change via DataHub UI or API
Verify CDC event is captured in the CDC topic (CDC events are in JSON format):
kafka-console-consumer --bootstrap-server broker:9092 \
--topic datahub.metadata_aspect_v2 \
--from-beginning --max-messages 1Verify MCL is generated in the MCL topic (MCL events are Avro-serialized and require schema registry access):
# Using kafka-avro-console-consumer (requires schema registry)
kafka-avro-console-consumer --bootstrap-server broker:9092 \
--topic MetadataChangeLog_Versioned_v1 \
--property schema.registry.url=http://schema-registry:8081 \
--from-beginning --max-messages 1Alternatively, check the MCE consumer logs to confirm MCLs are being generated:
docker logs mce-consumer | grep -i "Emitting MCL"