Writing data into Apache Kafka can become a tedious task for any data developer. If you’ve ever had a situation where your applications insert data in real-time to your database and you want to take actions on that data by moving it into a Kafka Topic, then Meroxa can help you do that in a few lines of code.
Overview
Here we will show an example of multiple applications inserting data into a PostgreSQL database, where we then use Meroxa to stream that data over to a Kafka Topic instantly as records get inserted.
Below we can see how data flows from your Applications
to PostgreSql
and then where Meroxa
comes in to stream it in real-time to your Kafka Topic
Take Me To The Code!
In this example, we will use the Javascript Turbine framework to get records from PostgreSQL and write them to your Kafka Topic.
💡 If you prefer to use another language, Meroxa supports Go, Python, and Ruby as well with many more coming!
Requirements
- Meroxa account
- Meroxa CLI
- Meroxa supported PostgreSQL DB
- Apache Kafka/Confluent Cloud Credentials
- Node JS
Once you have signed up for Meroxa and set up the Meroxa CLI you can follow the following steps to get up and running:
💡 Here we are creating the resources via the CLI, you can also do so via the Meroxa Dashboard once you are logged in.
-
Adding your PostgreSQL and Kafka Topic Resources
PostgreSQL (Guide on configuring your Postgres) - Source Resource
Below we are creating a PostgreSQL connection to Meroxa named
pg_db
.Note: To support CDC (Change Data Capture) we turn on the
logical_replication
flag.$ meroxa resource create pg_db \\\\ --type postgres \\\\ --url postgres://$PG_USER:$PG_PASS@$PG_URL:$PG_PORT/$PG_DB \\\\ --metadata '{"logical_replication":"true"}'
Kafka (Guide on setting up Confluent Cloud/Kafka) - Destination Resource
Below we are creating a Kafka connection named
apachekafka
.$ meroxa resource create apachekafka \\ --type kafka \\ --url "kafka+sasl+ssl://<USERNAME>:<PASSWORD>@<BOOTSTRAP_SERVER>?sasl_mechanism=plain" \\
-
Initializing Turbine
$ meroxa apps init meroxa-kafka --lang js
-
Writing The 4 Lines Of Code
Open up your
meroxa-kafka
folder in your preferred IDE. You will get boilerplate code that explains where to code your sources and destinations named in Step 1. In our case we just need to do the following:exports.App = class App { async run(turbine) { // First, identify your PostgreSQL source name as configured in Step 1 // In our case we named it pg_db let source = await turbine.resources("pg_db"); // Second, specify the table you want to access in your PostgreSQL DB let records = await source.records("customer_data_table"); // Optional, Process each record that comes in! // let transformed = await turbine.process(records, this.transform); // Third, identify your Kafka/Confluent source name configured in Step 1 let destination = await turbine.resources("apachekafka"); // Finally, specify which Topic to write that data to await destination.write(records, "customer_data_topic"); } };
💡
await turbine.process
allows developers to write a function that will be run on each record. If you need to pre-process your data before sending it to your Kafka topic you can write your code here. -
Deploying Your App
Commit your changes
$ git add . $ git commit -m "Initial Commit"
Deploy your app
$ meroxa apps deploy
Once your app is deployed you will see your Kafka Topic populate all the data from the PostgreSQL table. You can also insert a record to your table to see it stream over live in Confluent Cloud!
Meroxa will set up all the connections and remove the complexities, so you the developer can focus on the important stuff.
Have questions or feedback?
If you have questions or feedback, reach out directly by joining our community or by writing to support@meroxa.com.
Happy Coding 🚀