Conduit 0.4 is here!

See what's new

Save Money on Workato and Gain Real-Time Data Streaming with Meroxa

By  Simon Lawrence

 25 Jan 2023

Diagram of the Turbine Data Streaming App Deployed on the Meroxa Platform

In my last post I contrasted data apps with web apps, which was a fairly high-level discussion. This time around, I decided to get a little more hands on and show you how we’re using data apps at Meroxa to power Meroxa the business. The app I’m going to talk about is one we developed to simplify how we get account and subscription data to Salesforce so our sales and marketing teams could make use of it.

Where we started

Before we get into the app, let’s begin by talking about how we were getting data from our data warehouse to Salesforce. Prior to using our own platform we made use of Workato. Workato is a no-code solution that allows you to create “recipes” in their graphical editor. The recipe pulled data from our data warehouse, made a few API calls and then wrote data to our Salesforce instance. There wasn’t an option for real-time so we compromised and setup the recipe to execute hourly. The diagram below illustrates the setup.

Workato Blog_before-sfdc-sync_Image 1

While this setup worked there were a few points of friction. The first was general lifecycle management of the recipes. The process for managing, testing and updating recipes was not great, especially for engineers who are used to version control and mature CI/CD pipelines. The second issue was that it was hard for new engineers to quickly understand what the recipe was doing. Understanding a recipe required navigating up and down levels in the Workato editor. We found ourselves wanting to just write code. With the introduction of Turbine, Meroxa’s data application framework that lets you quickly sync, persist, and transform data between data infrastructure, we saw this use case as a perfect candidate for replacement with a Turbine data app.

Where we are now

The diagram below shows a high-level view of our new setup. Instead of a Workato recipe we now have a real-time Turbine data app deployed on the Meroxa platform.

Workato Blog_after_image 2

By solving this issue using a Turbine data app we were able to gain several benefits. Instead of having to learn a specialized editor our developers are able to use their existing workflows. By bringing this solution into the realm of code any engineer on the team can improve and support it. Learning what the app does is now simply a matter of reading the code. Finally our data-app is real-time rather than being an hourly batch job.

How’d we do it?

While the picture above it nice, I’d like to get into the details of what’s actually involved.

So what did we need to do to get all these benefits?

  1. Register a Salesforce OAuth App
  2. Write a Turbine data app to replace the recipe.

The Salesforce App

Our first step was using the Salesforce Admin Console to create an app that we could use to interact with their API. I won’t go into detail on creating a connected app, you can find Salesforce’s documentation here. Once the Salesforce app was set up and we had our client_id and client_secret it was time to actually write our Turbine App.

Writing our Turbine App

The main tasks our Turbine app needed to accomplish was taking the event data supplementing it with info from Stripe and transforming it into the proper format for Salesforce. Let’s see how we were able to accomplish that with a minimal amount of code.

Here we’re using Stripe’s Go client library to fetch subscription information. What’s great about this code is that nothing about it is Turbine specific. Turbine apps can easily use internal libraries and share code with existing applications, reducing duplication and easing development.

package main

import (
	"github.com/stripe/stripe-go/v72"
	"github.com/stripe/stripe-go/v72/sub"
)

//<SNIP>

func translateStatus(subStatus stripe.SubscriptionStatus) string {
	if subStatus == "past_due" {
		return "Past Due"
	} else {
		return string(subStatus)
	}
}

func (bsf BasicStripeFetcher) fetchSubscriptionStatus(subID string) (string, error) {
	stripe.Key = bsf.apiKey

	subscription, err := sub.Get(subID, nil)
	if err != nil {
		return "", err
	}

	status := translateStatus(subscription.Status)
	return status, nil
}

The code below shows how we send data to the Salesforce API. Once again nothing Turbine specific here, we’re simple manipulating data and calling an API.

package main

import (
	"errors"
	"fmt"
	"log"

	"github.com/simpleforce/simpleforce"
)

type ProductData struct {
	accountId            string
	email                string
	givenName            string
	familyName           string
	planName             string
	stripeSubscriptionId string
	subscriptionStatus   string
	accountCreatedAt     string
}

type SalesforceUpdater interface {
	updateProductInstance(data ProductData) error
}

type BasicSalesforceUpdater struct {
	client *simpleforce.Client
}

//<SNIP>

func (bsu BasicSalesforceUpdater) query(data ProductData) error {
	q := fmt.Sprintf("SELECT FIELDS(ALL) FROM Product_Instance__c WHERE Workspace_Id__c = '%s' LIMIT 1", data.accountId)
	result, err := bsu.client.Query(q)
	if err != nil {
		return err
	}

	if len(result.Records) != 1 {
		return errors.New("unexpected query result")
	}

	obj := result.Records[0]

	if obj == nil {
		return errors.New("No Product Instance Found")
	}

	firstName := obj.StringField("Admin_First_Name__c")

	if firstName == "" {
		return errors.New("Couldn't fetch first name")
	}

	return nil
}

func (bsu BasicSalesforceUpdater) updateProductInstance(data ProductData) error {
	obj := bsu.client.SObject("Product_Instance__c").
		Set("ExternalIDField", "Workspace_Id__c").
		Set("Workspace_Id__c", data.accountId).
		Set("Name", "Org: "+data.accountId).
		Set("Admin_Email__c", data.email).
		Set("Admin_First_Name__c", data.givenName).
		Set("Admin_Last_Name__c", data.familyName).
		Set("Product__c", data.planName).
		Set("Stripe_Subscription_Id__c", data.stripeSubscriptionId).
		Set("Subscription_Status__c", data.subscriptionStatus).
		Set("Workspace_Created_At__c", data.accountCreatedAt).
		Upsert()

	if obj == nil {
		return errors.New("upsert failed!")
	}

	return nil
}

Finally we pull it all together in our app.go. Here we’re using the Turbine framework to connect to our data source, get the stream of events, and process those events using the the helper functions defined above.

package main

//<SNIP>

func (a App) Run(v turbine.Turbine) error {
	platformDB, err := v.Resources("MY_DATA_WAREHOUSE")
	if err != nil {
		return err
	}

	configs := turbine.ResourceConfigs{
		turbine.ResourceConfig{
			Field: "table.types",
			Value: "VIEW",
		},
		turbine.ResourceConfig{
			Field: "incrementing.column.name",
			Value: "account_id",
		},
		turbine.ResourceConfig{
			Field: "validate.non.null",
			Value: "false",
		},
	}
	rr, err := platformDB.Records("tablename", configs)
	if err != nil {
		return err
	}

	//<SNIP>

	v.Process(rr, WriteToSalesforce{})

	return nil
}

//Converting the Turbine Record data to a form that's ready for
//sending to salesforce
func RecordToProductData(r turbine.Record) ProductData {
	accountId := r.Payload.Get("account_id").(float64)
	createdAt := r.Payload.Get("account_created_at").(float64)

	givenName, ok := r.Payload.Get("user_given_name").(string)
	if !ok {
		givenName = ""
	}

	familyName, ok := r.Payload.Get("user_family_name").(string)
	if !ok {
		familyName = ""
	}

	planName, ok := r.Payload.Get("plan_name").(string)
	if !ok {
		planName = ""
	}

	return ProductData{
		accountId:            strconv.Itoa(int(accountId)),
		email:                r.Payload.Get("user_email").(string),
		givenName:            givenName,
		familyName:           familyName,
		planName:             planName,
		stripeSubscriptionId: r.Payload.Get("stripe_subscription_id").(string),
		accountCreatedAt:     strconv.Itoa(int(createdAt)),
	}
}

type WriteToSalesforce struct{}

func (f WriteToSalesforce) Process(rr []turbine.Record) []turbine.Record {

  //<SNIP> fetching of env vars

	salesforceUpdater, err := newBasicSalesforceUpdater(salesforceInstanceUrl, salesforceClientId, salesforceUser, salesforcePassword, salesforceToken)
	if err != nil {
		log.Fatal("ERROR: salesforce updater creation failed")
	}

	for _, r := range rr {
		pd := RecordToProductData(r)
		subscriptionId := r.Payload.Get("stripe_subscription_id").(string)
		subscriptionStatus, err := subscriptionFetcher.fetchSubscriptionStatus(subscriptionId)

		if err != nil {
			continue
		}

	  //update our data with information from Stripe
		pd.subscriptionStatus = subscriptionStatus
		err = salesforceUpdater.updateProductInstance(pd)
	}

	// return original records unmodified
	return rr
}

In the interest of space I’ve included only interesting snippets of code, but the full source files can be found here.

We’re currently running this application in production and it has allowed us to save almost $150,000 per year by ending our use of Workato. We already have a few updates in the pipeline to give our marketing and sales teams even more data. Look for future posts where I cover any updates we roll out.

Hopefully, you come away from this post with an appreciation of how Turbine data apps can solve a class of problems that almost all companies have. Let us know what you think by joining the discussion on our Discord channel or in Github discussions. We can’t wait to see what you build. Click here to get started.

         Meroxa, Turbine, real-time data

Simon Lawrence

Simon Lawrence

VP of Engineering at Meroxa