website logo
Legacy DocsPortal
⌘K
Welcome to Ditto
Onboarding
Ditto Basics
SDK Setup Guides
Platform Manual
HTTP API
Kafka Connector
Use Cases
FAQs
Troubleshooting Guide
Support
Docs powered by
Archbee

Kafka Connector

21min

This topic provides step-by-step instructions for integrating your Kafka topic with Ditto, along with an overview of Ditto event notifications that indicate changes have occurred:

  • Requesting Upgrade
  • Integrating Kafka Connector

Requesting Upgrade

To upgrade your organization to the Kafka Connector, complete the Get in Touch form.

Once submitted, Ditto will provide you with the necessary next steps to proceed with your premium upgrade.

Integrating Kafka Connector

To integrate the Kafka Connector with your Kafka consumer:

1

Set up Kafka in your local environment. (Getting Started)

2

Get your access certificates and connect Ditto to your Kafka instance. (Obtaining Certificates and Connecting)

3

Prepare your Kafka environment for Ditto. (Setting Up and Configuring)

4

Perform a simple validation test to verify you have successfully integrated Ditto with your Kafka instance. (Verifying Integration)

5

Perform a simple event to test your consumer.

Getting Started​

If you're new to Kafka, complete the steps provided in the official Apache Kafka Quickstart guide to set up and try out basic Kafka operations on your local environment.

When completing the Apache Kafka Quickstart steps, use scripts located in the bin directory of Kafka version: kafka_2.13-3.1.0.

Obtaining Certificates and Connecting

Securely connect Ditto to your Kafka instance by utilizing the certificates provided by Ditto for your dedicated cluster to establish a Secure Sockets Layer (SSL) connection.

The following Kafka settings are enabled only if your organization is upgraded to the Kafka premium option and are on a dedicated cluster.

To get your certificates, from the Ditto portal > Live Query Settings, click Download next to Cluster Certificate and User Certificate:

Prevent unauthorized access and potential security breaches by storing your downloaded certificates in a secure location.

Document image


Setting Up

Using the following mappings between authentication tokens and their respective Kafka configuration for SSL connections, integrate :

SSL Connection

Ditto Name

Description

ssl.truststore.location

Cluster certificate

The certificate authority (CA) certificate in PKCS12 format.

ssl.truststore.password

Cluster certificate password

Decrypts the value of the CA certificate.

ssl.keystore.location

User certificate

Signifies the location of the user certificate in PKCS12 format.

ssl.keystore.password

User certificate password

Decrypts the user certificate.

Verifying Integration

Once you've set up and configured your Kafka topic with Ditto, perform a simple test to validate successful integration:

1

Copy-paste the following bash script in a terminal, and then replace each variable with the relevant information that displays in the Ditto portal > Live Query Settings for your app:

Make sure that the values you enter in your configuration for the Kafka Group and Kafka Topic settings match the string that displays in the Ditto portal > Live Query Settings for your app.

This alignment is critical for proper integration between Ditto and Kafka.

If the test is successful, no errors display in your terminal, and the script continues running without shutting down.

Bash
|
CLUSTER_CERT_LOCATION=/path/to/cluster.p12
CLUSTER_CERT_PW=<YOUR_CLUSTER_CERT_PASSWORD>

USER_CERT_LOCATION=/path/to/user.p12
USER_CERT_PW=<YOUR_USER_CERT_PASSWORD>

CLOUD_ENDPOINT=<YOUR_ENDPOINT>
TOPIC=<YOUR_TOPIC>

KAFKA=/path/to/kafka_2.13-3.1.0

$KAFKA/bin/kafka-console-consumer.sh \
 --bootstrap-server $CLOUD_ENDPOINT \
 --consumer-property security.protocol=SSL \
 --consumer-property ssl.truststore.password=$CLUSTER_CERT_PW \
 --consumer-property ssl.truststore.location=$CLUSTER_CERT_LOCATION \
 --consumer-property ssl.keystore.password=$USER_CERT_PW \
 --consumer-property ssl.keystore.location=$USER_CERT_LOCATION \
 --group $TOPIC \
 --topic $TOPIC 

2

Keeping the script active in your terminal, verify that events write to the console every time a change replicates to the Big Peer by performing a write operation in the codebase of your app:

The following snippets demonstrate upserting a new value to the Ditto store in various languages:

JS
Node.js
Swift
Kotlin
Java
|
const docID = await ditto.store.collection('people').upsert({
  name: 'Susan',
  age: 31,
})
console.log(docID) // "507f191e810c19729de860ea"

C++
C#
Rust
cURL
|
curl -X POST 'https://<CLOUD_ENDPOINT>/api/v4/store/write' \
  --header 'Content-Type: application/json' \
  --data-raw '{
      "commands": [{
        "method": "upsert",
        "collection": "people",
        "id": "abc123",
        "value": {
          "name": "Susan", "age": 31
        }
      }]
  }'

3

If you want to react to a consumable event by writing changes to the Ditto store, use the HTTP API. For more information, see Ditto Events, as follows, and the HTTP API Reference.

Events Overview

Once your consumer is active, events appear in the console whenever a change replicates to the Big Peer.

If you want to respond to a consumable event by triggering changes in the Ditto store, invoke the Ditto HTTP API. For more information, see HTTP API Reference.

New Document

When a new document is created, the method is upsert and the property: change.oldValue is always null. 

The txnID field describes a timestamp for when the Big Peer internally replicates data modifications from small peers. This timestamp is an always-increasing value.

The following snippet is an example of a JSON event stream that appears in your Kafka console when a new document is upserted to Ditto:

Text
|
{
  "txnId": 552789,
  "type": "documentChanged",
  "collection": "people",
  "change": {
    "method": "upsert",
    "oldValue": null,
    "newValue": {
      "_id": "6213e9c90012e4af0017cb9f",
      "date": 1645472201,
      "name": "Susan",
      "age": 31
    }
  }
}


Update Document

The following table provides an overview of the JSON event stream that displays in the Kafka console when an upsert method executes and modifies a document:

Field Property

Description

change.oldValue

Contains the previous version of the document

change.newValue

Once the upsert operation completes, contains the complete document; the fields that changed and the fields that remain unchanged.

For instance, the following snippet shows changes to document 6213e9c90012e4af0017cb9f:

Text
|
{
  ownedCars: 0, 
  friends: [], 
  name: "Frank"
}


Once the update operation executes the changes to the document, the following JSON event stream appears in your Kafka console:

Text
|
{
  "txnId": 553358,
  "type": "documentChanged",
  "collection": "people",
  "change": {
    "method": "upsert",
    "oldValue": {
      "_id": "6213e9c90012e4af0017cb9f",
      "date": 1645472312,
      "name": "Susan",
      "age": 31
    },
    "newValue": {
      "_id": "6213e9c90012e4af0017cb9f",
      "date": 1645472312,
      "name": "Frank",
      "age": 31,
      "ownedCars": 0,
      "friends": []
    }
  }
}


Remove Data

When a document is removed using the remove method, the field property change.value that appears in your Kafka console's JSON event stream contains the full document at the time of its removal, as follows:

Text
|
{
  "txnId": 701251,
  "type": "documentChanged",
  "collection": "people",
  "change": {
    "method": "remove",
    "value": {
      "_id": "6213e9c90012e4af0017cb9f",
      "date": 1645468039,
      "name": "Susan",
      "age": 31 
    }
  }
}


RequeryRequired

When the consumer stream fails to keep up with incoming changes or there are gaps in the data, a requeryRequired event type appears in your Kafka console. For example:

Text
|
{
  "txnID": 45,
  "type": "requeryRequired",
  "documents": [{
    "appId": "abc",
    "collectionName": "people",
    "documentId": "6213e9c90012e4af0017cb9f"
  }]
}


Responding to Consumer Streaming Failures

If you receive a requeryRequired event, catch your system up for each document listed in the collection by calling the HTTP RPC API endpoint for each document listed in the collection.

As demonstrated in the following snippet, do the following in each of your API calls:

  • Use the transaction_id to query.
  • Set the value of the X-DITTO-TXN-ID header to txnId.
Curl
|
curl -X POST 'https://<api_endpoint>/your_query_endpoint' \
--header 'Content-Type: application/json' \
--header 'X-DITTO-TXN-ID: <transaction_id>' \
--data-raw '{
    "documentId": "<document_id>"
}'




Updated 27 Sep 2023
Did this page help you?
PREVIOUS
JSON Representation of Key Resources​
NEXT
Use Cases
Docs powered by
Archbee
TABLE OF CONTENTS
Requesting Upgrade
Integrating Kafka Connector
Getting Started​
Obtaining Certificates and Connecting
Setting Up
Verifying Integration
Events Overview
New Document
Update Document
Remove Data
RequeryRequired
Responding to Consumer Streaming Failures
Docs powered by
Archbee