Tray Platform / Use cases / ETL implementations / ETL (small scale)

ETL (small scale)

Note on scaling

The approach explained here is appropriate for processing moderate amounts of data (i.e. tens or hundreds of rows at a time)

You could also adapt the workflow below to use workflow threads in order to carry out the transformation in multiple workflow instances running in parallel, in order to decrease execution time

However, when processing thousands of rows at a time, you should make use of the script method highlighted in ETL (large scale) as your workflows will run signifcantly faster

When processing massive amounts of data - hundreds of thousands of rows at a time - it is recommended that you make use of a data warehousing operation which has specific capacity for high throughput, such as the Redshift COPY operation

Basic implementation

The following workflow illustrates a very simple example of:

  1. Extracting accounts from Salesforce and loading them into a Postgresql database.
  2. Transforming each record by setting the 'External ID' field type as integer
  3. Loading the accounts to a PostgreSQL database

Please note that the instructions below do not go through every single step! You can click here to download and import your own instance of this workflow for testing and complete examination

Note that this setup assumes that you have already created the appropriate schema and tables in your database (please see below for guidance on dynamically creating your tables based on the data that is being returned by the source service):

Scheduled trigger note

When using a Scheduled Trigger to run your ETL periodically, you will very likely need to implement the last runtime method

This has been left of the above workflow for the purposes of simplicity

The basic stages of this are:

1 - Extract the data from the source service (Salesforce accounts in this case) using the Loop connector to paginate through the results. This will come to an end when Salesforce no longer returns an offset token, indicating there are no more records to be found

2 - Transform the 'External_ID__c' field to an integer for each account, using the Text Helpers 'Change Type' operation:

We also use an Object Helpers 'Add key/value pairs' step to create the object with the transformed ID:

3 - Add each account to a list / array:

Best practice note

We use the loop count to increment the list to e.g. 'accounts_list_1', 'accounts_list_2' etc.

This is to ensure that data storage is never overloaded with a large single list over the limit of 400KB

Although this is only really a concern when dealing with thousands of records, it is a good habit to get into!

4 - The 'accounts' list is then retrieved from data storage and loaded to the destination database (Postgresql in this case):

A key thing to note here is that all the accounts are loaded in one single call, rather than looping through the data and making e.g. 100 calls for each account.

Managing data

Using fallback values

It is very likely that you will need to allow for ocassions when certain data is not returned for individual records.

For example it may be that sometimes no phone number has been entered for a particular account.

In this case you can make use of the Tray.io Fallback values feature.

The following shows that if 'Phone' is not returned then the value will be set to 'null' (in this case make sure that null values for 'Phone' are accepted by your database! It may be that you will need to set it to empty string):

'Flattening' data

The above example of extracting data from Salesforce is fairly simple because records data is always returned in a 'flat' single layer.

The following gives an example of a nested data structure from Outreach which is slightly more complicated:

[
{
"type": "prospect",
"id": 48993,
"attributes": {
"engagedAt": "2021-01-17T21:00:37+00:00",
"engagedScore": 1,
"openCount": 1
}
},
{
"type": "prospect",
"id": 14467,
"attributes": {
"engagedAt": "2020-03-07T20:55:32+00:00",
"engagedScore": 4,
"openCount": 1
}
},
{
"type": "prospect",
"id": 29168,
"attributes": {
"engagedAt": "2018-08-14T08:33:12+00:00",
"engagedScore": 1,
"openCount": 1
}
}

As you can see the engagedAt, engagedScore and openCount fields are nested within an attributes object.

If we want to group these and insert to the database with a single insert command for MySQL, Postgresql, etc. we need to 'pull' the nested 'attributes' up into a single top layer.

The following workflow shows how we can achieve this. Note how we can use the $.steps.loop-1.value.attributes.engagedAt jsonpath to get at the engagedAt value nested within 'attributes':

The output flattened structure for a single object then looks like this:

The steps taken here are:

  1. Loop through each object in the array
  2. Use the Object Helpers Add key/value pairs operation to flatten the data into a single layer
  3. Use Data Storage to append each object to a new array
  4. Once all items are looped through we can get the new flattened list and then pass it with a sql insert query.

Dynamic payloads

The above section on using fallback values tells you how to deal with occasions when certain fields in your data are not returned.

However, you may also need to deal with payloads where the actual fields themselves may change each time.

For example, one time the payload may be this:

{
"type": "prospect",
"id": 29168,
"attributes": {
"engagedAt": "2018-08-14T08:33:12+00:00",
"engagedScore": 1,
"openCount": 1
}
}

Another time it may be this:

{
"type": "prospect",
"id": 29168,
"attributes": {
"addressCity": "Chicago",
"companyType": "enterprise",
"engagedAt": "2018-08-14T08:33:12+00:00",
"engagedScore": 1,
"openCount": 1
}
}

And another time it may be this:

{
"type": "prospect",
"id": 29168,
"attributes": {
"engagedAt": "2018-08-14T08:33:12+00:00",
"engagedScore": 1,
"openCount": 1,
"tags": "unregistered, startup"
}
}

Please see the Redshift documentation on dealing with dynamic data for details on how to deal with this scenario.

Other considerations

Failing workflows and error handling

Setting up error handling methods can help mitigate against workflows which might fail and hold up the completion of your threads.

The following example shows how we could get the details around an error associated with getting contact data and put them into a Google Sheets record of errors:

In this case we are extracting the timestamp, contact name and error message to add to each entry in Google Sheets, before breaking the loop to move on to the next contact.

Dynamic schema creation

A requirement for your ETL project may be to dynamically generate the table schema in the destination database - perhaps because there is a vast number of potential fields, which would make the manual process extremely labor-intensive.

That is to say that you will not be manually creating the table schema in your database before setting up the ETL system, and you want your Tray.io workflow to:

  1. Pull data from the source service
  2. Build the correct table schema (including the correct data types, allowed null values etc.) based on the data that has been returned
  3. Automatically create the table in the database and input the first batch of data
  4. Input all subsequent batches of data in accordance with the defined schema

Subsequent to this you may also need to automatically respond to new fields added to the schema, or even such changes as fields being renamed.

If dynamic schema creation is a requirement of your implementation please contact your Sales Engineer or Customer Success representative to discuss the exact requirements.

Problematic input schema

Some Data Warehouses, such as Google Big Query, may have input schema which is difficult to satisfy.

In the case of Big Query, the following 'flat' input data:

"value": [
{
"Email": "muhammad@tray.io",
"IsActive": true,
"Title": "Sales Engineer",
"Id": "0054xxxxxxxxWauSQAS",
"Name": "Muhammad Naqvi"
}
]

Would need converted to the following:

"value": [
"row": {
"data": {
"Email": "muhammad@tray.io",
"IsActive": true,
"Title": "Sales Engineer",
"Id": "0054xxxxxxxxWauSQAS",
"Name": "Muhammad Naqvi"
}
}
]

If you are having difficulties satisfying the schema requirements, please contact your customer succcess representative.