Sync, Transform, & Migrate data in Real-Time from PostgreSQL to MongoDB w/ Meroxa

By  Tanveet Gill

 13 Dec 2022

PostgreSQL to MongoDB

Video Tutorial (1 minute)

Github Repo: meroxa/turbine-examples/javascript/user-demo/

💡 To see how to move data out of Mongo to any data destination, check out our blog post here: https://meroxa.com/blog/streaming-changes-in-real-time-from-mongodb-to-apache-kafka

This blog covers using MongoDB as a downstream source. We will be moving data in real-time from PostgreSQL to MongoDB. Meroxa will keep track of any changes in your PostgreSQL database and post those CREATE, UPDATE or DELETE operations in MongoDB, keeping both in Sync.

Migrating data from PostgreSQL to MongoDB or vice versa can be a time-consuming process. With Meroxa you can do this in just a few lines of code. In this blog post we will be keeping our PostgreSQL database in sync with our MongoDB Atlas instance. In addition, we will briefly go over how you can transform the data going into MongoDB in real-time.

While this post covers getting data into Mongo, we can also pull data out of Mongo to any data destination by doing the opposite of what's covered in this post (Here’s a blog post on moving data from MongoDB to Apache Kafka in real-time).

What is Meroxa?

Meroxa is a streaming application platform where developers can run their Turbine applications. Meroxa handles the underlying streaming infrastructure so that developers can focus on building their applications. Turbine applications start with an upstream resource. Once that upstream resource is connected, Meroxa will handle streaming the data into the Turbine application for execution.

What is Turbine?

Turbine is a stream processing application framework for building event-driven data apps that respond to data in real-time and scale using cloud-native best practices. No bespoke domain-specific language (DSL).

You can even see how your app reacts to data by running your Turbine data applications locally—we show you exactly what will happen in Production, with faster iteration and development without having to deploy.

You can write your Turbine data apps using GoJavascriptPython, or Ruby.

💡 If you prefer to use another language, Meroxa has support for many more languages coming, reach out directly to suggest a language by joining our community or by writing to support@meroxa.com

How it works

In this example, the Turbine app will create a CDC (Change Data Capture) connector from the platform to a PostgreSQL database (can be any database) and then writes that data to MongoDB Atlas.

Flowcharts (1)

Here's what happens and what we can do to stream and transform our data:

  • The PostgreSQL connector receives changes in real-time and publishes them in the form of a stream.
  • Inside our Turbine app we can write functions to transform and manipulate that data. We can do anything we would generally do with any programming language such as calling APIs or importing packages and libraries and change that data.
  • The Meroxa Platform then streams that data to MongoDB in real-time, without you, the developer having to worry about scalability, flexibility or schemas.

Requirements

Setup

Once you have signed up for Meroxa and set up the Meroxa CLI you can follow the following 4 steps to get up and running:

💡 Here we are creating the resources via the CLI, you can also do so via the Meroxa Dashboard once you are logged in.

  1. Adding your PostgreSQL and MongoDB Atlas Resources

    PostgreSQL (Guide on configuring your PostgreSQL) - Source Resource

    Below we are creating a PostgreSQL connection to Meroxa named pg_db.

    Note: To support CDC (Change Data Capture) we turn on the logical_replication flag.

    $ meroxa resource create pg_db \\\\
    --type postgres \\\\
    --url postgres://$PG_USER:$PG_PASS@$PG_URL:$PG_PORT/$PG_DB \\\\
    --metadata '{"logical_replication":"true"}'

    MongoDB Atlas (Guide on setting up Mongo Db Atlas) - Destination Resource

    Below we are creating a MongoDB Atlas connection named mdb.

    $ meroxa resource create mdb \\
    --type mongodb \\
    --url "mongodb+srv://$MONGO_USER:$MONGO_PASS@$MONGO_URL/$MONGO_DATABASE_NAME"
    
  2. Initializing our Turbine app

    $ meroxa apps init postgres-to-mongo --lang js

    This will create a directory called postgres-to-mongo with some boilerplate code to get you started.

  3. Coding our Turbine app

    Open up your postgres-to-mongo folder in your preferred IDE. Let’s code our upstream and downstream resources that we defined in step 1 above.

    async run(turbine) {
      // First, identify your PostgreSQL source name as configured in Step 1
      // In our case we named it pg_db
      let source = await turbine.resources("pg_db");
    
      // Second, specify the table you want to access in your PostgreSQL DB
      let records = await source.records("User");
    
      // Third, Process each record that comes in!
      let processed = await turbine.process(records, this.processData);
    
      // Fourth, identify your MongoDB destination resource configured in Step 1
      let destination = await turbine.resources("mdb");
    
      // Finally, specify which "collection" in mongo to write to. If none exists, it will be created
      await destination.write(processed, "user_copy");
    }

    In our processData function we will just be logging the time when the record was processed. However, in this function you can do anything to transform your records, such as calling an API, manipulating data, enriching data etc. In the code below we have some examples in the comments.

    processData(records) {
      for (const record of records) {
        const dateTimeGmt = new Date().toGMTString()
        console.log(`[DEBUG] Streaming Record To Destination: ${dateTimeGmt}`)
    
        // Encrypt data using a 3rd party library or package
        record.set(
          'secretcode',
          sha256(record.get('secretcode'))
        );
    
        // Format Data via a custom function
        record.set('phone_number', formatPhone(record.get('phone_number')))
    
        // Enrich Data via an API
        const addressLookupResults = await googleMapsLookup(record.get('address'))
        const addressMetaData = generateAddressObject(addressLookupResults)
        record.set('address_metadata', addressMetaData);
      }
    
      records.unwrap();
      return records;
    }

    💡 For a more detailed example on using API’s & doing transformations in Turbine you can read our blog post here.

  4. Deploying Your Application

    Commit your changes

    $ git add .
    $ git commit -m "Initial Commit"

    Deploy your app

    $ meroxa apps deploy

    💡 To visualize your deployed application, you can check out an overview of our Turbine visualizations here.

    Once your app is deployed you will see the PostgreSQL data populate in the user_copy collection in MongoDB Atlas. As records or changes come into your data source (PostgreSQL in this example), your Turbine app running on the Meroxa platform will process each record in real-time!

    Meroxa will set up all the connections and remove the complexities, so you, the developer, can focus on the important stuff.

    Have questions or feedback?

    If you have questions or feedback, reach out directly by joining our community or by writing to support@meroxa.com.

    Happy Coding 🚀

     Meroxa, Turbine, Real-time, PostgreSQL, Change Data Capture, MongoDB, SQL, Data migration, CDC, Sync

Tanveet Gill

Tanveet Gill

Solutions Architect @ Meroxa