Real-Time Data Streaming from PostgreSQL to Apache Kafka in 4 Lines of Code w/ CDC

By  Tanveet Gill

 8 Nov 2022

Writing data into Apache Kafka can become a tedious task for any data developer. If you’ve ever had a situation where your applications insert data in real-time to your database and you want to take actions on that data by moving it into a Kafka Topic, then Meroxa can help you do that in a few lines of code.

Overview

Here we will show an example of multiple applications inserting data into a PostgreSQL database, where we then use Meroxa to stream that data over to a Kafka Topic instantly as records get inserted.

Below we can see how data flows from your Applications to PostgreSql and then where Meroxa comes in to stream it in real-time to your Kafka Topic

Flowcharts

Take Me To The Code!

In this example, we will use the Javascript Turbine framework to get records from PostgreSQL and write them to your Kafka Topic.

💡 If you prefer to use another language, Meroxa supports Go, Python, and Ruby as well with many more coming!

Requirements

Once you have signed up for Meroxa and set up the Meroxa CLI you can follow the following steps to get up and running:

💡 Here we are creating the resources via the CLI, you can also do so via the Meroxa Dashboard once you are logged in.

  1. Adding your PostgreSQL and Kafka Topic Resources

    PostgreSQL (Guide on configuring your Postgres) - Source Resource

    Below we are creating a PostgreSQL connection to Meroxa named pg_db.

    Note: To support CDC (Change Data Capture) we turn on the logical_replication flag.

    $ meroxa resource create pg_db \\\\
      --type postgres \\\\
      --url postgres://$PG_USER:$PG_PASS@$PG_URL:$PG_PORT/$PG_DB \\\\
      --metadata '{"logical_replication":"true"}'

    Kafka (Guide on setting up Confluent Cloud/Kafka) - Destination Resource

    Below we are creating a Kafka connection named apachekafka.

    $ meroxa resource create apachekafka \\    --type kafka \\    --url "kafka+sasl+ssl://<USERNAME>:<PASSWORD>@<BOOTSTRAP_SERVER>?sasl_mechanism=plain" \\  
  2. Initializing Turbine

    $ meroxa apps init meroxa-kafka --lang js  
  3. Writing The 4 Lines Of Code

    Open up your meroxa-kafka folder in your preferred IDE. You will get boilerplate code that explains where to code your sources and destinations named in Step 1. In our case we just need to do the following:

    exports.App = class App {
      async run(turbine) {
        // First, identify your PostgreSQL source name as configured in Step 1
        // In our case we named it pg_db
        let source = await turbine.resources("pg_db");
    
        // Second, specify the table you want to access in your PostgreSQL DB
        let records = await source.records("customer_data_table");
    
        // Optional, Process each record that comes in!
        // let transformed = await turbine.process(records, this.transform);
    
        // Third, identify your Kafka/Confluent source name configured in Step 1
        let destination = await turbine.resources("apachekafka");
    
        // Finally, specify which Topic to write that data to
        await destination.write(records, "customer_data_topic");
      }
    };

    💡 await turbine.process allows developers to write a function that will be run on each record. If you need to pre-process your data before sending it to your Kafka topic you can write your code here.

  4. Deploying Your App

    Commit your changes

    $ git add .  $ git commit -m "Initial Commit"  

    Deploy your app

    $ meroxa apps deploy  

Once your app is deployed you will see your Kafka Topic populate all the data from the PostgreSQL table. You can also insert a record to your table to see it stream over live in Confluent Cloud!

Meroxa will set up all the connections and remove the complexities, so you the developer can focus on the important stuff.

Have questions or feedback?

If you have questions or feedback, reach out directly by joining our community or by writing to support@meroxa.com.

Happy Coding 🚀

     Meroxa, Streaming Application, Data Streaming, Real-time, Apache Kafka, PostgreSQL

Tanveet Gill

Tanveet Gill

Solutions Architect @ Meroxa