Kafka Connector
This topic provides step-by-step instructions for integrating your Kafka topic with Ditto, along with an overview of Ditto event notifications that indicate changes have occurred:
To upgrade your organization to the Kafka Connector, complete the Get in Touch form.
Once submitted, Ditto will provide you with the necessary next steps to proceed with your premium upgrade.
To integrate the Kafka Connector with your Kafka consumer:
Set up Kafka in your local environment. (Getting Started)
Get your access certificates and connect Ditto to your Kafka instance. (Obtaining Certificates and Connecting)
Prepare your Kafka environment for Ditto. (Setting Up and Configuring)
Perform a simple validation test to verify you have successfully integrated Ditto with your Kafka instance. (Verifying Integration)
Perform a simple event to test your consumer.
Getting Started
If you're new to Kafka, complete the steps provided in the official Apache Kafka Quickstart guide to set up and try out basic Kafka operations on your local environment.
When completing the Apache Kafka Quickstart steps, use scripts located in the bin directory of Kafka version: kafka_2.13-3.1.0.
Securely connect Ditto to your Kafka instance by utilizing the certificates provided by Ditto for your dedicated cluster to establish a Secure Sockets Layer (SSL) connection.
The following Kafka settings are enabled only if your organization is upgraded to the Kafka premium option and are on a dedicated cluster.
To get your certificates, from the Ditto portal > Live Query Settings, click Download next to Cluster Certificate and User Certificate:
Prevent unauthorized access and potential security breaches by storing your downloaded certificates in a secure location.

Using the following mappings between authentication tokens and their respective Kafka configuration for SSL connections, integrate :
SSL Connection | Ditto Name | Description |
ssl.truststore.location | Cluster certificate | The certificate authority (CA) certificate in PKCS12 format. |
ssl.truststore.password | Cluster certificate password | Decrypts the value of the CA certificate. |
ssl.keystore.location | User certificate | Signifies the location of the user certificate in PKCS12 format. |
ssl.keystore.password | User certificate password | Decrypts the user certificate. |
Once you've set up and configured your Kafka topic with Ditto, perform a simple test to validate successful integration:
Copy-paste the following bash script in a terminal, and then replace each variable with the relevant information that displays in the Ditto portal > Live Query Settings for your app:
Make sure that the values you enter in your configuration for the Kafka Group and Kafka Topic settings match the string that displays in the Ditto portal > Live Query Settings for your app.
This alignment is critical for proper integration between Ditto and Kafka.
If the test is successful, no errors display in your terminal, and the script continues running without shutting down.
Keeping the script active in your terminal, verify that events write to the console every time a change replicates to the Big Peer by performing a write operation in the codebase of your app:
The following snippets demonstrate upserting a new value to the Ditto store in various languages:
If you want to react to a consumable event by writing changes to the Ditto store, use the HTTP API. For more information, see Ditto Events, as follows, and the HTTP API Reference.
Once your consumer is active, events appear in the console whenever a change replicates to the Big Peer.
If you want to respond to a consumable event by triggering changes in the Ditto store, invoke the Ditto HTTP API. For more information, see HTTP API Reference.
When a new document is created, the method is upsert and the property: change.oldValue is always null.
The txnID field describes a timestamp for when the Big Peer internally replicates data modifications from small peers. This timestamp is an always-increasing value.
The following snippet is an example of a JSON event stream that appears in your Kafka console when a new document is upserted to Ditto:
The following table provides an overview of the JSON event stream that displays in the Kafka console when an upsert method executes and modifies a document:
Field Property | Description |
change.oldValue | Contains the previous version of the document |
change.newValue | Once the upsert operation completes, contains the complete document; the fields that changed and the fields that remain unchanged. |
For instance, the following snippet shows changes to document 6213e9c90012e4af0017cb9f:
Once the update operation executes the changes to the document, the following JSON event stream appears in your Kafka console:
When a document is removed using the remove method, the field property change.value that appears in your Kafka console's JSON event stream contains the full document at the time of its removal, as follows:
When the consumer stream fails to keep up with incoming changes or there are gaps in the data, a requeryRequired event type appears in your Kafka console. For example:
If you receive a requeryRequired event, catch your system up for each document listed in the collection by calling the HTTP RPC API endpoint for each document listed in the collection.
As demonstrated in the following snippet, do the following in each of your API calls:
- Use the transaction_id to query.
- Set the value of the X-DITTO-TXN-ID header to txnId.