Skip to main content
Operator-managed Change Data Capture (CDC) is available starting with Operator version 0.9.0.
CDC allows you to connect Big Peer running in your environment to other cloud applications and third party services through Kafka Data Bridges. CDC is organised into individual “Data Bridges” which work by:
  1. Capturing changes made to documents in Big Peer
  2. Filtering these changes based on your configuration
  3. Publishing them to Kafka topics for consumption by external systems
For an overview of CDC and possible use cases, see Change Data Capture.

Prerequisites

Before setting up CDC, ensure you have:
  1. Deployed a Big Peer using the Ditto Operator (version 0.9.0 or above)
  2. Created a Big Peer App

Creating a Kafka Data Bridge

To create a Kafka data bridge, you need to have a BigPeer deployed and a BigPeerApp associated with it. Once those requirements are met, you can create your data bridge as follows:
cat <<'EOF' | kubectl apply -f -
---
# Create a kafka BigPeerDataBridge
apiVersion: ditto.live/v1alpha1
kind: BigPeerDataBridge
metadata:
  labels:
    ditto.live/app: example-app
  name: example-data-bridge
  namespace: ditto
spec:
  bridge:
    kafka:
      topicPartitions: 10
      source:
        - collection: tasks
          query: 'true'
      streamType: untyped
  description: Data Bridge used for testing purposes.
EOF
This will create a Kafka data bridge called example-data-bridge, associated with the example-app BigPeerApp from the quickstart guide.

Filtered vs Unfiltered Bridges

You can create either filtered or unfiltered data bridges:
  • Filtered Bridge: Captures only changes to specific collections matching your query
    source:
      collection: tasks
      query: 'status = "active"'  # Only tasks with status "active"
    
  • Unfiltered Bridge: Captures all changes across your entire app
    source:
      collection: tasks
      query: 'true'  # All documents in the tasks collection
    

Topic Partitions

The topicPartitions field allows you to customize the number of Kafka topic partitions (from 1 to 100). Choose this value based on your expected throughput and number of consumers:
  • 12 partitions (default): Good for most use cases, allows 1, 2, 3, 4, 6, or 12 parallel consumers
  • 24-60 partitions: For higher throughput workloads
See Partitioning Strategies for more details.

Connecting to CDC

To connect to CDC Kafka topics, you’ll need to extract the necessary metadata and credentials. In the examples below, these will be saved to local files for later use. For production environments, make sure to store credentials securely.

Extracting Credentials

First, fetch the connection info from the BigPeerDataBridge status:
kubectl -n ditto get bigpeerdatabridges.ditto.live example-data-bridge -o yaml | yq '.status.kafka'
The output will look similar to this:
certificateSecretRef:
  cluster:
    certificate: ca.p12
    name: ditto-example-log-cluster-ca-cert
    password: ca.password
  user:
    certificate: user.p12
    name: example-data-bridge-b54e8b21-uc
    password: user.password
consumerGroupPrefix: example-data-bridge-b54e8b21
endpoint: 2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost:443
topic: example-data-bridge-2164bef3-37c0-489c-9ac6-c94b034525d7

Cluster Certificate and Password

Extract the cluster certificate and its password from the Kubernetes secret referenced in status.kafka.certificateSecretRef.cluster.name.
SECRET_NAME="ditto-example-log-cluster-ca-cert"  # From status.kafka.certificateSecretRef.cluster.name
NAMESPACE="ditto"

# Extract and decode ca.p12
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.ca\.p12}" | base64 -d > cluster.p12

# Extract and decode ca.password
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.ca\.password}" | base64 -d > cluster.password

Topic and Group ID Prefix

The topic name is available in status.kafka.topic. The Kafka topics use a group ID prefix, so multiple consumer groups can read from the topic independently. This prefix is available in status.kafka.consumerGroupPrefix.

User Certificate and Password

Extract the user certificate and password from the Kubernetes secret referenced in status.kafka.certificateSecretRef.user.name.
SECRET_NAME="example-data-bridge-b54e8b21-uc"  # From status.kafka.certificateSecretRef.user.name
NAMESPACE="ditto"

# Extract and decode user.p12
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.user\.p12}" | base64 -d > user.p12

# Extract and decode user.password
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.user\.password}" | base64 -d > user.password

Connecting to Kafka

With all the required connection information extracted, you can now connect a consumer to the topic. See the Change Data Capture documentation for guidance on which parameters to configure in your consumer with this information.
1

Identify your endpoint

The endpoint to connect to is available in status.kafka.endpoint.If you’ve followed the quickstart guide, it will be:
2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost:443
2

Run the console consumer

Now we can run the console consumer, supplying the user certs, cluster certs, topic and a group ID using the group prefix.
KAFKA=/path/to/kafka

$KAFKA/bin/kafka-console-consumer.sh \
  --bootstrap-server 2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost:443 \
  --consumer-property security.protocol=SSL \
  --consumer-property ssl.truststore.type=PKCS12 \
  --consumer-property ssl.truststore.location=./cluster.p12 \
  --consumer-property ssl.truststore.password=$(cat ./cluster.password) \
  --consumer-property ssl.keystore.type=PKCS12 \
  --consumer-property ssl.keystore.location=./user.p12 \
  --consumer-property ssl.keystore.password=$(cat ./user.password) \
  --group example-data-bridge-b54e8b21-my-consumer \
  --topic example-data-bridge-2164bef3-37c0-489c-9ac6-c94b034525d7 \
  --from-beginning
If the console runs without error, then you’ve successfully connected.
If you’re running a local kind cluster, you may encounter the following issues:SSL Handshake FailuresIf you see errors like “PKIX path building failed” or “SSL handshake failed”, SSL passthrough is not enabled on your nginx ingress controller. The Kind ingress-nginx deployment does not enable this by default.Enable it by patching the controller:
kubectl patch deployment ingress-nginx-controller -n ingress-nginx \
  --type='json' \
  -p='[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--enable-ssl-passthrough"}]'
Wait for the controller to restart:
kubectl rollout status deployment/ingress-nginx-controller -n ingress-nginx
DNS Resolution ErrorsIf you experience DNS resolution errors, you may need to add an entry to your system’s /etc/hosts file. This is due to how the Java runtime resolves .localhost subdomains.You can check if DNS resolution is the issue with:
nslookup 2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost
If this fails to resolve, add an entry to /etc/hosts:
echo "127.0.0.1       2164bef3-37c0-489c-9ac6-c94b034525d7-bootstrap.localhost" | sudo tee -a /etc/hosts
3

Verify changes are streaming

The easiest way to verify that changes are streaming successfully is by inserting a document through the HTTP API.If you haven’t already, follow the steps in Using the Big Peer HTTP API to create an API key.Example document insertion:
curl -X POST http://localhost/2164bef3-37c0-489c-9ac6-c94b034525d7/api/v4/store/execute \
  --header "Authorization: bearer YOUR_API_KEY" \
  --header "Content-Type: application/json" \
  --data-raw '{
    "statement": "INSERT INTO tasks DOCUMENTS (:doc1)",
    "args": {
      "doc1": {
        "_id": "123",
        "body": "Test task",
        "isCompleted": false
      }
    }
  }'
You should see output in your Kafka consumer like:
{
  "txnId": 454875010,
  "version": 1,
  "cdcTimestamp": "2025-03-18T14:26:09.537711736Z",
  "type": "documentChanged",
  "collection": "tasks",
  "change": {
    "method": "upsert",
    "oldValue": null,
    "newValue": {
      "_id": "123",
      "_version": "1",
      "body": "Test task",
      "isCompleted": false
    }
  }
}

Deleting a Data Bridge

To delete a data bridge:
kubectl delete bigpeerdatabridge example-data-bridge
This will remove the bridge and stop streaming changes to the associated Kafka topic.

Learn More

For more information on CDC concepts, see the Change Data Capture documentation, which covers:

Legacy Helm-based Deployment

If you’re using an older version of the Operator or need to use the Helm-based CDC deployment, see Legacy Helm-based CDC.