Kafka Events: Node.js
In this article we will listen to Kafka events and pipe them to commandline process stdout.
For more information about Kafka, see Change Data Capture.
- Basic understanding of Node.js.
See the github repository for example code to connect a Node.js instance to the Ditto Big peer as a Kafka sink to MongoDB.
Converting Certificates to the Proper Formats
First, you must download the proper Kafka certficiates and convert them to the format required by SSL via Node.js.
Convert the .p12 files to the required user.key, cluster.crt, and user.crt files. When propmted, use the appropriate cluster certficiate password or user password as described in the portal.
Decoding Transactions
All messages from the Ditto CDC are sent to your Kafka sink as JSON. First, you must decode the transaction as JSON.
Each transaction has a type — Ditto has two types, requeryRequired and documentChanged.
Parsing documentChanged Events
For the onDocumentChanged function, we will parse the event into one of three possible types: Insert, Update, and Remove.
When change.oldValue is equal to null, that means that a new document was inserted into the database.
If change.oldValue has a value, that means that a document with the corresponding _id was updated to the value indicated in change.newValue.
When change.method is equal to "remove", then the document has been deleted from Ditto.
Parsing requeryRequired Event
Send an HTTP request to tell the Ditto Big Peer to catch up to the given transaction id as part of transaction.txnId and the given collection. Your HTTP endpoint will look like https://${APP_ID}.cloud.ditto.live.