Real-Time Data Enrichment for Data Activation Using Meroxa Turbine and Clearbit

By  DeVaris Brown

 4 Aug 2022

Data activation, or reverse ETL, is the process of pulling data from your data warehouse and making it actionable by your business users in their preferred tooling. One of the main ingredients for data activation is data enrichment. Data enrichment enhances existing data by supplementing missing or incomplete data with information from internal or external sources.

As seen in the diagram below, we see a typical architecture for data activation. Once a data record reaches the warehouse, a service acts upon that record, enriches it with data (internal or external), and places it in whatever destination a stakeholder needs.

Untitled (2)

The data activation pattern can be used for a number of use cases, including the following:

Customer Service - Gather customer details, support history, and purchase activity all in one place to provide a more tailored experience

Sales - Access more detailed information about leads and their engagement activity can increase conversion and renewals

Marketing - Create personalized and targeted campaigns based on activity to improve lead generation efforts

Using Meroxa to Simplify and Turbocharge Data Activation

By using Meroxa’s Turbine Application Framework, you can simplify the data activation process by reducing the need to use multiple point solutions for transformation and reverse ETL with code.

Untitled (3)

In the above diagram, the Meroxa Turbine data app cleans and enriches events from various data sources in real time, so the data is already in a consumable format when it reaches the destination. This saves data-driven organizations considerable amounts of money, resources, and time.

Show Me the Code!

In this example, we use Go to pull records from a PostgreSQL database, enrich a record, and put it back into another table in the same PostgreSQL database. The destination can be any resource Meroxa officially supports, including Snowflake, S3, Salesforce, etc…

💡 If you want to skip the tutorial to see the full example, check out the Github repo.

Requirements

Adding a PostgreSQL Resource to the Meroxa Catalog

The first step in creating a data app is to add the PostgreSQL resource to the Meroxa catalog. If your database supports logical replication, set the metadata configuration value to true.

$ meroxa resource create pg_db \\
  --type postgres \\
  --url postgres://$PG_USER:$PG_PASS@$PG_URL:$PG_PORT/$PG_DB \\
  --metadata '{"logical_replication":"true"}'

Initializing a Turbine Data App

$ meroxa apps init meroxa-clearbit --lang golang  

When you initialize the Turbine app, you’ll see we include a ton of comments and boilerplate to help you get up and going. We’ll be removing most of this for this example, but take a look around and even execute meroxa apps run to see the output of our sample app.

Clearbit Helper Function

The helper below uses the clearbit-go package to wrap a helper function around Clearbit’s combined enrichment API. Essentially it takes an email address and returns details on the associated person and company. The helper takes the result and returns a nicely formatted UserDetails struct.

package main
import (
  "github.com/clearbit/clearbit-go/clearbit"
  "log"
  "os"
)

type UserDetails struct {
	FullName        string
    Location        string
    Role            string
    Seniority       string
    Company         string
    GithubUser      string
    GithubFollowers int
}

func EnrichUserEmail(email string) (*UserDetails, error) {
	key := os.Getenv("CLEARBIT_API_KEY")
    client := clearbit.NewClient(clearbit.WithAPIKey(key))
    results, resp, err := client.Person.FindCombined(
    	clearbit.PersonFindParams{
    		Email: email,
    	}
    )

    if err != nil {
        log.Printf("error looking up email; resp: %+v", resp.Status)
        return nil, err
    }

    return &UserDetails{
        FullName:        results.Person.Name.FullName,
        Location:        results.Person.Location,
        Role:            results.Person.Employment.Role,
        Seniority:       results.Person.Employment.Seniority,
        Company:         results.Company.Name,
        GithubUser:      results.Person.GitHub.Handle,
        GithubFollowers: results.Person.GitHub.Followers,
   }, nil
}

Modifying app.go

This section of the app defines the main topology of the Data App. Here you can see that we’re referencing a collection (or table) called user_activity from a resource named pg_db. This is specifically a PostgreSQL database with a table called user_activity but Turbine (and the Meroxa platform) abstract that away so you only really need to worry about the name of the resource and the collection that you’re interested in accessing.

We then process that collection via EnrichUserData (detailed below) and ultimately output the results from db into a collection named user_activity_enriched.

In order to hit the Clearbit API, we have to provide an API Key. The RegisterSecret method makes that available to the function by mirroring the environment variable into the context of the function.

func (a App) Run(v turbine.Turbine) error {
	db, err := v.Resources("pg_db")
    
    if err != nil {
    	return err
    }
    
    stream, err := db.Records("user_activity", nil) // stream is a collection of records, can't be inspected directly
    
    if err != nil {
    	return err
    }
    
    err = v.RegisterSecret("CLEARBIT_API_KEY") // makes env var available to data app
    
    if err != nil {
    	return err
    }
    
    res, _ := v.Process(stream, EnrichUserData{}) // function to be implemented
    
    err = db.Write(res, "user_activity_enriched")
    
    if err != nil {
    	return err
    }
    
    return nil
}

Enriching Data with Functions

Each record will be processed by the EnrichUserData function, as seen below. When the program is compiled, this function will be extracted via reflection. Meroxa will automatically create the DAG and orchestrate the data through each component(DB > function > DB).

We included some additional magic on the Payload methods (more info here). The.Set method allows Turbine to modify the payload without having to worry about the underlying format or schema.

type EnrichUserData struct{}

func (f EnrichUserData) Process(stream []turbine.Record) []turbine.Record {
	for i, record := range stream {
    	log.Printf("Got email: %s", record.Payload.Get("email"))
        UserDetails, err := EnrichUserEmail(record.Payload.Get("email").(string))
        
        if err != nil {
        	log.Println("error enriching user data: ", err)
            break
        }
        
        log.Printf("Got UserDetails: %+v", UserDetails)
        err = record.Payload.Set("full_name", UserDetails.FullName)
        err = record.Payload.Set("company", UserDetails.Company)
        err = record.Payload.Set("location", UserDetails.Location)
        err = record.Payload.Set("role", UserDetails.Role)
        err = record.Payload.Set("seniority", UserDetails.Seniority)
        if err != nil {
        	log.Println("error setting value: ", err)
            break
        }
        
        rr[i] = r
   }
   
   return rr
}

Testing Locally and Deploying to Production

Modify your app.json to match your resource name and fixture file location. In this example, our fixtures are in fixtures/pg.json

"resources": {
	"pg_db": "fixtures/pg.json"
}

The pg.json file should have a property that matches the collection specified in app.go. In this example, we’re using user_activity. Our app will take the email address in the payload object, send it to Clearbit, and return the data we specified in clearbit.go.

Data record before running meroxa apps run

"payload": {
	"activity": "registered",
    "updated_at": 1643214353680,
    "user_id": 108,
    "created_at": 1643214353680,
    "id": 1,
    "deleted_at": null,
    "email": "devaris@meroxa.io"
}

Data record after running meroxa apps run

"payload": {
	"activity": "logged in",
    "company": "Meroxa",
    "created_at": 1643411169715,
    "deleted_at": null,
    "email": "devaris@meroxa.io",
    "full_name": "DeVaris Brown",
    "id": 3,
    "location": "Oakland, CA, US",
    "role": "leadership",
    "seniority": "executive",
    "updated_at": 1643411169715,
    "user_id": 108
},

That looks good, so let’s deploy this data app into production by running meroxa apps deploy.

$ meroxa apps deploy
  Checking for uncommitted changes...
  ✔ No uncommitted changes!
  Validating branch...
  ✔ Deployment allowed from main branch!
  Preparing application "meroxa-clearbit" (golang) for deployment...
  ✔ Application built!
  ✔ Can access to your Turbine resources
  ✔ Application processes found. Creating application image...
  ✔ Platform source fetched!
  ✔ Source uploaded!
  ✔ Successfully built Process image! ("fe983a75-fcb5-469f-a133-86647631ce85")
  ✔ Deploy complete!
  ✔ Application "meroxa-clearbit" successfully created!

And now we’re done!

Recap

This data app showed how easy data activation can be without requiring a user to stitch together a bunch of point solutions. With idiomatic code and the Meroxa Turbine SDK, we can now process and enrich data in real-time using the Clearbit API.

If you’d like to see more data app examples, please feel free to make your request in our Discord channel. Otherwise, Get started by requesting a free demo of Meroxa and build something cool. Your app could also be featured in our “Data App Spotlight” series.

     Meroxa, Turbine

DeVaris Brown

DeVaris Brown

CEO and Co-Founder @ Meroxa.