Tray Platform / Standard / Best Practices / Pagination / Batch processing / Workflow Threads

Workflow Threads

When dealing with large amounts of data, workflow threading is a technique which can greatly help manage the efficiency and reliability of your workflows.

Threading basically involves dividing your data into batches to send them to a callable workflow for processing, and setting up a system to wait until the number of threads finished is equal to the number of threads started, before continuing to the next stage in your workflow.

This massively increases the efficiency of your data processing, as each batch will be processed in a parallel 'thread', rather than waiting for each one to finish.

This page will take you through how to set up workflow threads, including:

  • Using account-level data storage and the 'atomic increment' operation to set up a system which counts the number of threads running and identifies when they are all complete

  • Using 'environment variables' to pull the main workflow url as an identifier for the account-level data

  • Monitoring and taking action if any threads are taking too long to complete

Multiple threads single list example

To demonstrate this we can look at a workflow which is pulling batches of records to be processed and sending them to a second workflow.

Note that this workflow could be manually triggered as and when you need to use it, or it could use a scheduled trigger.

This example assumes that overloading data storage is not a concern and so all the threads are feeding into a single list, which is then retrieved at the end of the main workflow.

You can download the two workflows involved to import and test yourself:

The explanation below will take you through the main points involved in these workflows, but they will not go through every single step! Please explore, test and analyse the above workflows for full details.

The main workflow

The main workflow looks like this:

The numbered sections are explained as follows:

  1. We get and store the time the threading process started, so we can retrieve it for checking how long the threads have been running, later in the workflow

  2. For each batch we use the Atomic Increment operation to increase the threads started count by 1:

    Note that the key is stored at Account level and uses the threads_started_{$.env.workflow_uuid} jsonpath to pick up the uuid of the main workflow, so that we know what workflow this thread is linked to.

  3. Each batch is then sent to the callable workflow

  4. In the last batch, before breaking the pagination loop, we store the url of the child (processing) workflow, so we can retrieve it later in the workflow, to facilitate notification when a workflow has been running too long

  5. In the compare threads loop, we check to see if the threads finished count matches the threads started count - which indicates that all the threads have finished processing.

    If so then we break the loop and continue to the next stage of the workflow.

    If not, then we delay 5 seconds and check again.

    Note - please see the callable workflow below for details on how the 'threads finished' number is incremented.

  6. Before checking again, we run checks to see how long the threads have been running.

    If it is longer than 20 mins then we notify a Slack channel with the url of the child processing workflow so investigations can be made as to what thread might be causing the delay

    If it has been running for over an hour then a notification of termination is sent and the list and threads are reset.

  7. The single list which the threads have been creating can then be retrieved (please see the callable workflow below for details on how the list is created):

    Again, note that it uses the jsonpath accounts_list_{$.env.workflow_uuid} to attach the workflow uuid to the list name.

    The list can then be inserted to the database (or whatever other service you wish to use):

  8. Finally the list and thread counts are reset, ready for the next run of the workflow.

    Make sure the list is set to an empty array, while the thread counts are set to 0

    Otherwise you will have a data type mismatch the next time the workflow runs!

The processing workflow

The complete processing workflow looks like this:

The steps involved are:

  1. The append account to list step uses account level data storage and the append to list operation to compile the list of messages that the main workflow can then grab when all processing is complete:

    Note that the accounts_list_{$.steps.trigger.#calling_workflow} jsonpath is used to append the uuid of the main workflow to the list.

  2. The very last step of the workflow uses Atomic increment to add 1 to the threads finished account level key each time a processing thread completes:

    Again note that the threads_finished_{$.steps.trigger.#calling_workflow} jsonpath is used to append the uuid of the main workflow to the threads_finished key.

Important notes

Using a scheduled trigger

When polling with a scheduled trigger, it is very important to make sure that a run of the workflow is allowed to complete before any more runs are triggered. To do this first click on Advanced Properties and then tick the box as shown:

Your schedule should also be set so that you allow plenty of time for runs to complete. In a case where 3 attempts at starting a run are made while one is still completing, these runs are missed. They are not added to a queue.

Note: any data processing steps (working with a messages list, uploading to airtable etc.) in this example are given for demonstration purposes only. The technique of threading is applicable in multiple scenarios and with many different connectors.

Clearing account-level data

When testing, it is helpful to set a separate manually-triggered workflow which clears account level data as explained here.

This means you can easily reset your data if your workflow errors out.