Big Peer Change Data Capture (CDC) is currently available as a Helm deployed add-on for Operator managed Big Peer.

CDC allows you to connect Big Peer running in your environment to other cloud applications and third party services.

CDC is organised into individual “Data Bridges” which work by:

  1. Capturing changes made to documents in Big Peer
  2. Filtering these changes based on your configuration
  3. Publishing them to Kafka topics for consumption by external systems

For an overview of CDC and possible use cases, see Change Data Capture.

Prerequisites

Before setting up CDC, ensure you have:

  1. Installed the Ditto Operator (version 0.3.0 or above)
  2. Deployed a Big Peer
  3. Created an App on your Big Peer
  4. Deployed Kafka

Deploying the Ditto Operator

Version 0.3.0 or above is required.

Consult the Operator documentation to get started with the Operator.

The examples in the guide will assume you’ve deployed on a kind cluster using our recommended kind config, but can be adjusted to suit your environment.

Deploying a Big Peer

Deploy a Big Peer using a BigPeer custom resource.

For example:

cat <<'EOF' | kubectl apply -f -
---
apiVersion: ditto.live/v1alpha1
kind: BigPeer
metadata:
  name: bp1
  namespace: ditto
spec:
  version: 1.43.0
  network:
    ingress:
      # A unique ingress is needed for each Big Peer
      host: bp1.localhost
  auth:
    providers:
      onlinePlayground:
        anonymous:
          permission:
            read:
              everything: true
              queriesByCollection: {}
            write:
              everything: true
              queriesByCollection: {}
          sessionLength: 630000
          sharedToken: abc123
EOF

This creates a basic Big Peer we’ll reference for this guide, called bp1.

Creating an App

Create an App on your Big Peer using either the Operator API, or a BigPeerApp resource.

For example:

cat <<'EOF' | kubectl apply -f -
---
apiVersion: ditto.live/v1alpha1
kind: BigPeerApp
metadata:
  name: example-app
  namespace: ditto
  labels:
    ditto.live/big-peer: bp1
spec:
  appId: 2164bef3-37c0-489c-9ac6-c94b034525d7
EOF

This creates an App called example-app on the bp1 Big Peer. The appId is used to identify the App across different Big Peers and will be needed when configuring CDC.

Deploying Kafka

For your convenience, we’ve provided a Helm chart to deploy Kafka. You may need to change the baseDomain if you need to make the Kafka topics available over a specific domain you have an ingress for.

For this guide, we’ll assume you’ve deployed using the recommended kind cluster deployment, and we’ll establish a path on localhost by setting baseDomain to kafka.localhost:

helm install kafka-connectors \
  oci://quay.io/ditto-external/kafka-connectors \
  --namespace ditto \
  --create-namespace \
  --set baseDomain=kafka.localhost

The naming of certain resources deployed depends on the name of the helm release. The rest of this guide will assume this release is named kafka-connectors.

Wait a few minutes for all the pods to be ready:

kubectl get pods -n ditto -l strimzi.io/cluster=kafka-connectors

NAME                                                READY   STATUS    RESTARTS        AGE
kafka-connectors-entity-operator-75d794565c-7486t   2/2     Running   0               3m
kafka-connectors-kafka-connectors-0                 1/1     Running   0               3m

Deploying CDC

CDC is deployed using the ditto-connectors Helm chart.

First, create a Helm values file with the configuration needed to deploy CDC:

cat <<EOF > big-peer-values.yaml
# The App ID of your Big Peer App
appId: 2164bef3-37c0-489c-9ac6-c94b034525d7

# Disable MongoDB connector as we're not using it
mongoConnector:
  enabled: false

# The name that was given to the Big Peer in the \`BigPeer\` resource. In this case, 'bp1'
bigPeerName: bp1

# CDC needs to be configured with the Kafka cluster deployed in the first step. The cluster name will match the release name used when installing using the chart provided.
# If you followed the example above, this can be left with the default of 'kafka-connectors'. Otherwise, un-comment and update this below.
# cdc:
#   kafka:
#     clusterName: kafka-connectors

# CDC Data Bridge configuration
streamSplitter:
  enabled: true
  config:
    # This Data Bridge filters for blue cars
    - enabled: true
      streamModifier:
        filters:
          collection: cars
          expression: "color = 'blue'"
    # This Data Bridge streams all documents
    - enabled: true
      streamModifier:
        filters: {}
EOF

With your configuration values set, deploy CDC using:

helm install ditto-cdc \
  oci://quay.io/ditto-external/ditto-connectors \
  --namespace ditto \
  --create-namespace \
  -f cdc-values.yaml

After a few moments, you should see cdc, cdc-heartbeat and stream-splitter pods running:

kubectl get pods -n ditto -l app.kubernetes.io/instance=ditto-cdc

NAME                                                              READY   STATUS    RESTARTS   AGE
cdc-2164bef3-37c0-489c-9ac6-c94b034525d7-5f749947fb-rxj5s         1/1     Running   0         4m
cdc-heartbeat-2164bef3-37c0-489c-9ac6-c94b034525d7-6cbb947vdjjr   1/1     Running   0         4m
stream-splitter-2164bef3-37c0-489c-9ac6-c94b034525d7-7d54dwbtzb   1/1     Running   0         4m

Connecting to CDC

To connect to CDC Kafka topics, you’ll need to extract the necessary metadata and credentials.

In the examples below, these will be saved to local files for later use. For production environments, make sure to store credentials securely.

Extracting Credentials

Cluster Certificate and Password

Start by extracting the cluster certificate and its password, which are stored in a Kubernetes secret created during Kafka cluster deployment (in the same namespace).

If you followed the Kafka deployment steps exactly, it will be called kafka-connectors-cluster-ca-cert. Otherwise, it will take the name of the release used for the Kafka installation, suffixed by -cluster-ca-cert.

From this secret, extract and base64 decode the PKCS#12 certificate and password for use with your Kafka client.

# Name of the secret - replace 'kafka-connectors' with your release name if different
SECRET_NAME="kafka-connectors-cluster-ca-cert"
NAMESPACE="ditto"

# Extract and decode ca.p12
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.ca\.p12}" | base64 -d > ca.p12

# Extract and decode ca.password
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.ca\.password}" | base64 -d > ca.password

Topic Selection

Next, we need to choose a topic to connect to.

You can see the full list of topics created with:

kubectl get kafkatopic -n ditto -l strimzi.io/cluster=kafka-connectors

Example output:

NAME                                                    CLUSTER            PARTITIONS   REPLICATION FACTOR   READY
2164bef3-37c0-489c-9ac6-c94b034525d7-all-true           kafka-connectors   1            1                    True
2164bef3-37c0-489c-9ac6-c94b034525d7-cars-colorblue     kafka-connectors   1            1                    True
heartbeats-cdc-2164bef3-37c0-489c-9ac6-c94b034525d7     kafka-connectors   1            1                    True
hydra-cdc-output-2164bef3-37c0-489c-9ac6-c94b034525d7   kafka-connectors   1            1                    True

The names seen here are the names of the topics.

For this guide, we’ll choose the 2164bef3-37c0-489c-9ac6-c94b034525d7-all-true topic.

Group ID Prefix

The Kafka topics make use of a group ID prefix, so multiple consumers groups can read from the topic independently.

This prefix is identical to the topic you’ve selected in the previous step.

User Certificate and Password

Lastly, we need to obtain the user certificate and password.

These will be store in a secret named identically to the topic.

# Replace the secret name with the name of your topic, which will be identical
SECRET_NAME="2164bef3-37c0-489c-9ac6-c94b034525d7-all-true"
NAMESPACE="ditto"

# Extract and decode .p12
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.user\.p12}" | base64 -d > user.p12

# Extract and decode password
kubectl get secret $SECRET_NAME -n $NAMESPACE -o jsonpath="{.data.user\.password}" | base64 -d > user.password

Connecting to Kafka

With all the required connection information extracted, you can now connect a consumer to the topic.

See the Change Data Capture documentation for guidance on which parameters to configure in your consumer with this information.

For this guide, we’ll follow the same verification steps in the CDC docs.

1

Identify your endpoint hostname

The endpoint to connect the console consumer to will vary depending on how you configured your ingress earlier.

To check, you can inspect your ingresses:

kubectl get ingress -n ditto

The relevant ingress should have a name that corresponds to the helm release name.

If you’ve followed the steps in this guide, it will be called kafka-connectors-kafka-connectors-0.

From here, you can obtain the hostname with:

kubectl get ingress kafka-connectors-kafka-connectors-0 -n ditto -o jsonpath='{.spec.rules[0].host}'

Following these examples, this will return ditto-broker-0.kafka.localhost.

2

Run the console consumer

Now we can run the console consumer, supplying the user certs, clusters certs, topic and a group ID using the group prefix.

For example:

USER_CERT_LOCATION=./user.p12
USER_CERT_PW=$(cat ./user.password)

CLUSTER_CERT_LOCATION=./ca.p12
CLUSTER_CERT_PW=$(cat ./ca.password)

CLOUD_ENDPOINT=ditto-broker-0.kafka.localhost:443

TOPIC=2164bef3-37c0-489c-9ac6-c94b034525d7-all-true
GROUP=2164bef3-37c0-489c-9ac6-c94b034525d7-all-true-1

# Adjust if needed
KAFKA=.

$KAFKA/bin/kafka-console-consumer.sh \
  --bootstrap-server $CLOUD_ENDPOINT \
  --consumer-property security.protocol=SSL \
  --consumer-property ssl.truststore.password=$CLUSTER_CERT_PW \
  --consumer-property ssl.truststore.location=$CLUSTER_CERT_LOCATION \
  --consumer-property ssl.truststore.password=$USER_CERT_PW \
  --consumer-property ssl.truststore.location=$USER_CERT_LOCATION \
  --consumer-property client.dns.lookup=resolve_canonical_bootstrap_servers_only \
  --group $GROUP \
  --topic $TOPIC

If the console runs without error, then you’ve successfully connected.

If you’ve followed the getting started guide, running a kind cluster and hosting the ingress on kafka.localhost, you may experience some DNS errors here.

This is due to how the Java runtime, which the console consumer is built upon, resolves DNS.

Adding an entry to your system’s /etc/hosts should address this issue, for example: echo "127.0.0.1 ditto-broker-0.kafka.localhost:443" >> /etc/hosts

3

Verify changes are streaming

The easiest way to verify that changes are streaming successfully is by inserting a document through the HTTP API.

If you haven’t already, follow the steps in Using the Big Peer HTTP API to create an API.

Example document insertion:

curl -X POST http://bp1.localhost/2164bef3-37c0-489c-9ac6-c94b034525d7/api/v4/store/execute \
  --header "Authorization: bearer YOUR_API_KEY" \
  --header "Content-Type: application/json" \
  --data-raw '{
    "statement": "INSERT INTO cars DOCUMENTS (:doc1)",
    "args": {
      "doc1": {
        "_id": {"id": "002", "locationId": "2345"},
        "color": "blue",
        "type": "suv"
      }
    }
  }'

You should see output in your Kafka consumer like:

{
  "txnId": 9358,
  "version": 1,
  "cdcTimestamp": "2025-04-30T13:21:22.330679884Z",
  "type": "documentChanged",
  "collection": "cars",
  "change": {
    "method": "upsert",
    "oldValue": null,
    "newValue": {
      "_id": {"id": "002", "locationId": "2345"},
      "_version": "1",
      "color": "blue",
      "type": "suv"
    }
  }
}

Uninstalling

To uninstall, run helm delete on the Helm release made during installation:

helm delete ditto-cdc --namespace ditto