Operator-managed Change Data Capture (CDC) is available starting with Operator version 0.9.0.
CDC allows you to connect Big Peer running in your environment to other cloud applications and third party services through Kafka Data Bridges.
CDC is organised into individual “Data Bridges” which work by:
- Capturing changes made to documents in Big Peer
- Filtering these changes based on your configuration
- Publishing them to Kafka topics for consumption by external systems
For an overview of CDC and possible use cases, see Change Data Capture.
Prerequisites
Before setting up CDC, ensure you have:
- Deployed a Big Peer using the Ditto Operator (version 0.9.0 or above)
- Created a Big Peer App
Creating a Kafka Data Bridge
To create a Kafka data bridge, you need to have a BigPeer deployed and a BigPeerApp associated with it. Once those requirements are met, you can create your data bridge as follows:
cat <<'EOF' | kubectl apply -f -
---
# Create a kafka BigPeerDataBridge
apiVersion: ditto.live/v1alpha1
kind: BigPeerDataBridge
metadata:
labels:
ditto.live/app: example-app
name: example-data-bridge
namespace: ditto
spec:
bridge:
kafka:
topicPartitions: 10
source:
- collection: tasks
query: 'true'
streamType: untyped
description: Data Bridge used for testing purposes.
EOF
This will create a Kafka data bridge called example-data-bridge, associated with the example-app BigPeerApp from the quickstart guide.
Filtered vs Unfiltered Bridges
You can create either filtered or unfiltered data bridges:
-
Filtered Bridge: Captures only changes to specific collections matching your query
source:
collection: tasks
query: 'status = "active"' # Only tasks with status "active"
-
Unfiltered Bridge: Captures all changes across your entire app
source:
collection: tasks
query: 'true' # All documents in the tasks collection
Topic Partitions
The topicPartitions field allows you to customize the number of Kafka topic partitions (from 1 to 100). Choose this value based on your expected throughput and number of consumers:
- 12 partitions (default): Good for most use cases, allows 1, 2, 3, 4, 6, or 12 parallel consumers
- 24-60 partitions: For higher throughput workloads
See Partitioning Strategies for more details.
Connecting to CDC
To connect to CDC Kafka topics, you’ll need to extract the necessary metadata and credentials.
In the examples below, these will be saved to local files for later use. For production environments, make sure to store credentials securely.
First, fetch the connection info from the BigPeerDataBridge status:
kubectl -n ditto get bigpeerdatabridges.ditto.live example-data-bridge -o yaml | yq '.status.kafka'
The output will look similar to this:
certificateSecretRef:
cluster:
certificate: ca.p12
name: ditto-example-log-cluster-ca-cert
password: ca.password
user:
certificate: user.p12
name: example-data-bridge-b54e8b21-uc
password: user.password
consumerGroupPrefix: example-data-bridge-b54e8b21
endpoint: 2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost:443
topic: example-data-bridge-2164bef3-37c0-489c-9ac6-c94b034525d7
Cluster Certificate and Password
Extract the cluster certificate and its password from the Kubernetes secret referenced in status.kafka.certificateSecretRef.cluster.name.
SECRET_NAME="ditto-example-log-cluster-ca-cert" # From status.kafka.certificateSecretRef.cluster.name
NAMESPACE="ditto"
# Extract and decode ca.p12
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.ca\.p12}" | base64 -d > cluster.p12
# Extract and decode ca.password
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.ca\.password}" | base64 -d > cluster.password
Topic and Group ID Prefix
The topic name is available in status.kafka.topic.
The Kafka topics use a group ID prefix, so multiple consumer groups can read from the topic independently. This prefix is available in status.kafka.consumerGroupPrefix.
User Certificate and Password
Extract the user certificate and password from the Kubernetes secret referenced in status.kafka.certificateSecretRef.user.name.
SECRET_NAME="example-data-bridge-b54e8b21-uc" # From status.kafka.certificateSecretRef.user.name
NAMESPACE="ditto"
# Extract and decode user.p12
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.user\.p12}" | base64 -d > user.p12
# Extract and decode user.password
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.user\.password}" | base64 -d > user.password
Connecting to Kafka
With all the required connection information extracted, you can now connect a consumer to the topic.
See the Change Data Capture documentation for guidance on which parameters to configure in your consumer with this information.
Identify your endpoint
The endpoint to connect to is available in status.kafka.endpoint.If you’ve followed the quickstart guide, it will be:2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost:443
Run the console consumer
Now we can run the console consumer, supplying the user certs, cluster certs, topic and a group ID using the group prefix.KAFKA=/path/to/kafka
$KAFKA/bin/kafka-console-consumer.sh \
--bootstrap-server 2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost:443 \
--consumer-property security.protocol=SSL \
--consumer-property ssl.truststore.type=PKCS12 \
--consumer-property ssl.truststore.location=./cluster.p12 \
--consumer-property ssl.truststore.password=$(cat ./cluster.password) \
--consumer-property ssl.keystore.type=PKCS12 \
--consumer-property ssl.keystore.location=./user.p12 \
--consumer-property ssl.keystore.password=$(cat ./user.password) \
--group example-data-bridge-b54e8b21-my-consumer \
--topic example-data-bridge-2164bef3-37c0-489c-9ac6-c94b034525d7 \
--from-beginning
If the console runs without error, then you’ve successfully connected.If you’re running a local kind cluster, you may encounter the following issues:SSL Handshake FailuresIf you see errors like “PKIX path building failed” or “SSL handshake failed”, SSL passthrough is not enabled on your nginx ingress controller. The Kind ingress-nginx deployment does not enable this by default.Enable it by patching the controller:kubectl patch deployment ingress-nginx-controller -n ingress-nginx \
--type='json' \
-p='[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--enable-ssl-passthrough"}]'
Wait for the controller to restart:kubectl rollout status deployment/ingress-nginx-controller -n ingress-nginx
DNS Resolution ErrorsIf you experience DNS resolution errors, you may need to add an entry to your system’s /etc/hosts file. This is due to how the Java runtime resolves .localhost subdomains.You can check if DNS resolution is the issue with:nslookup 2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost
If this fails to resolve, add an entry to /etc/hosts:echo "127.0.0.1 2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost" | sudo tee -a /etc/hosts
Verify changes are streaming
The easiest way to verify that changes are streaming successfully is by inserting a document through the HTTP API.If you haven’t already, follow the steps in Using the Big Peer HTTP API to create an API key.Example document insertion:curl -X POST http://localhost/2164bef3-37c0-489c-9ac6-c94b034525d7/api/v4/store/execute \
--header "Authorization: bearer YOUR_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"statement": "INSERT INTO tasks DOCUMENTS (:doc1)",
"args": {
"doc1": {
"_id": "123",
"body": "Test task",
"isCompleted": false
}
}
}'
You should see output in your Kafka consumer like:{
"txnId": 454875010,
"version": 1,
"cdcTimestamp": "2025-03-18T14:26:09.537711736Z",
"type": "documentChanged",
"collection": "tasks",
"change": {
"method": "upsert",
"oldValue": null,
"newValue": {
"_id": "123",
"_version": "1",
"body": "Test task",
"isCompleted": false
}
}
}
Deleting a Data Bridge
To delete a data bridge:
kubectl delete bigpeerdatabridge example-data-bridge
This will remove the bridge and stop streaming changes to the associated Kafka topic.
Learn More
For more information on CDC concepts, see the Change Data Capture documentation, which covers:
Legacy Helm-based Deployment
If you’re using an older version of the Operator or need to use the Helm-based CDC deployment, see Legacy Helm-based CDC.