Data activation, or reverse ETL, is the process of pulling data from your data warehouse and making it actionable by your business users in their preferred tooling. One of the main ingredients for data activation is data enrichment. Data enrichment enhances existing data by supplementing missing or incomplete data with information from internal or external sources.
As seen in the diagram below, we see a typical architecture for data activation. Once a data record reaches the warehouse, a service acts upon that record, enriches it with data (internal or external), and places it in whatever destination a stakeholder needs.
The data activation pattern can be used for a number of use cases, including the following:
Customer Service - Gather customer details, support history, and purchase activity all in one place to provide a more tailored experience
Sales - Access more detailed information about leads and their engagement activity can increase conversion and renewals
Marketing - Create personalized and targeted campaigns based on activity to improve lead generation efforts
Using Meroxa to Simplify and Turbocharge Data Activation
By using Meroxa’s Turbine Application Framework, you can simplify the data activation process by reducing the need to use multiple point solutions for transformation and reverse ETL with code.
In the above diagram, the Meroxa Turbine data app cleans and enriches events from various data sources in real time, so the data is already in a consumable format when it reaches the destination. This saves data-driven organizations considerable amounts of money, resources, and time.
Show Me the Code!
In this example, we use Go to pull records from a PostgreSQL database, enrich a record, and put it back into another table in the same PostgreSQL database. The destination can be any resource Meroxa officially supports, including Snowflake, S3, Salesforce, etc…
💡 If you want to skip the tutorial to see the full example, check out the Github repo.
Requirements
Adding a PostgreSQL Resource to the Meroxa Catalog
The first step in creating a data app is to add the PostgreSQL resource to the Meroxa catalog. If your database supports logical replication, set the metadata configuration value to true
.
$ meroxa resource create pg_db \\
--type postgres \\
--url postgres://$PG_USER:$PG_PASS@$PG_URL:$PG_PORT/$PG_DB \\
--metadata '{"logical_replication":"true"}'
Initializing a Turbine Data App
$ meroxa apps init meroxa-clearbit --lang golang
When you initialize the Turbine app, you’ll see we include a ton of comments and boilerplate to help you get up and going. We’ll be removing most of this for this example, but take a look around and even execute meroxa apps run
to see the output of our sample app.
Clearbit Helper Function
The helper below uses the clearbit-go
package to wrap a helper function around Clearbit’s combined enrichment API. Essentially it takes an email address and returns details on the associated person and company. The helper takes the result and returns a nicely formatted UserDetails
struct.
package main
import (
"github.com/clearbit/clearbit-go/clearbit"
"log"
"os"
)
type UserDetails struct {
FullName string
Location string
Role string
Seniority string
Company string
GithubUser string
GithubFollowers int
}
func EnrichUserEmail(email string) (*UserDetails, error) {
key := os.Getenv("CLEARBIT_API_KEY")
client := clearbit.NewClient(clearbit.WithAPIKey(key))
results, resp, err := client.Person.FindCombined(
clearbit.PersonFindParams{
Email: email,
}
)
if err != nil {
log.Printf("error looking up email; resp: %+v", resp.Status)
return nil, err
}
return &UserDetails{
FullName: results.Person.Name.FullName,
Location: results.Person.Location,
Role: results.Person.Employment.Role,
Seniority: results.Person.Employment.Seniority,
Company: results.Company.Name,
GithubUser: results.Person.GitHub.Handle,
GithubFollowers: results.Person.GitHub.Followers,
}, nil
}
Modifying app.go
This section of the app defines the main topology of the Data App. Here you can see that we’re referencing a collection (or table) called user_activity
from a resource named pg_db
. This is specifically a PostgreSQL database with a table called user_activity
but Turbine (and the Meroxa platform) abstract that away so you only really need to worry about the name of the resource and the collection that you’re interested in accessing.
We then process that collection via EnrichUserData
(detailed below) and ultimately output the results from db
into a collection named user_activity_enriched
.
In order to hit the Clearbit API, we have to provide an API Key. The RegisterSecret
method makes that available to the function by mirroring the environment variable into the context of the function.
func (a App) Run(v turbine.Turbine) error {
db, err := v.Resources("pg_db")
if err != nil {
return err
}
stream, err := db.Records("user_activity", nil) // stream is a collection of records, can't be inspected directly
if err != nil {
return err
}
err = v.RegisterSecret("CLEARBIT_API_KEY") // makes env var available to data app
if err != nil {
return err
}
res, _ := v.Process(stream, EnrichUserData{}) // function to be implemented
err = db.Write(res, "user_activity_enriched")
if err != nil {
return err
}
return nil
}
Enriching Data with Functions
Each record will be processed by the EnrichUserData
function, as seen below. When the program is compiled, this function will be extracted via reflection. Meroxa will automatically create the DAG and orchestrate the data through each component(DB > function > DB).
We included some additional magic on the Payload
methods (more info here). The.Set
method allows Turbine to modify the payload without having to worry about the underlying format or schema.
type EnrichUserData struct{}
func (f EnrichUserData) Process(stream []turbine.Record) []turbine.Record {
for i, record := range stream {
log.Printf("Got email: %s", record.Payload.Get("email"))
UserDetails, err := EnrichUserEmail(record.Payload.Get("email").(string))
if err != nil {
log.Println("error enriching user data: ", err)
break
}
log.Printf("Got UserDetails: %+v", UserDetails)
err = record.Payload.Set("full_name", UserDetails.FullName)
err = record.Payload.Set("company", UserDetails.Company)
err = record.Payload.Set("location", UserDetails.Location)
err = record.Payload.Set("role", UserDetails.Role)
err = record.Payload.Set("seniority", UserDetails.Seniority)
if err != nil {
log.Println("error setting value: ", err)
break
}
rr[i] = r
}
return rr
}
Testing Locally and Deploying to Production
Modify your app.json
to match your resource name and fixture file location. In this example, our fixtures are in fixtures/pg.json
"resources": {
"pg_db": "fixtures/pg.json"
}
The pg.json
file should have a property that matches the collection specified in app.go
. In this example, we’re using user_activity
. Our app will take the email address in the payload
object, send it to Clearbit, and return the data we specified in clearbit.go
.
Data record before running meroxa apps run
"payload": {
"activity": "registered",
"updated_at": 1643214353680,
"user_id": 108,
"created_at": 1643214353680,
"id": 1,
"deleted_at": null,
"email": "devaris@meroxa.io"
}
Data record after running meroxa apps run
"payload": {
"activity": "logged in",
"company": "Meroxa",
"created_at": 1643411169715,
"deleted_at": null,
"email": "devaris@meroxa.io",
"full_name": "DeVaris Brown",
"id": 3,
"location": "Oakland, CA, US",
"role": "leadership",
"seniority": "executive",
"updated_at": 1643411169715,
"user_id": 108
},
That looks good, so let’s deploy this data app into production by running meroxa apps deploy
.
$ meroxa apps deploy
Checking for uncommitted changes...
✔ No uncommitted changes!
Validating branch...
✔ Deployment allowed from main branch!
Preparing application "meroxa-clearbit" (golang) for deployment...
✔ Application built!
✔ Can access to your Turbine resources
✔ Application processes found. Creating application image...
✔ Platform source fetched!
✔ Source uploaded!
✔ Successfully built Process image! ("fe983a75-fcb5-469f-a133-86647631ce85")
✔ Deploy complete!
✔ Application "meroxa-clearbit" successfully created!
And now we’re done!
Recap
This data app showed how easy data activation can be without requiring a user to stitch together a bunch of point solutions. With idiomatic code and the Meroxa Turbine SDK, we can now process and enrich data in real-time using the Clearbit API.
If you’d like to see more data app examples, please feel free to make your request in our Discord channel. Otherwise, Get started by requesting a free demo of Meroxa and build something cool. Your app could also be featured in our “Data App Spotlight” series.