On the road to real-time reporting with incremental data transfer

Posted by on January 27, 2020

Reporting Data at FreeAgent

We keep track of a range of customer behaviours in the FreeAgent application as a means to understand engagement and to formulate and measure our OKRs. Examples of this include support requests raised through our ticketing system, Zendesk, the scores and comments provided by our users during NPS surveys and event data detailing specific actions taken within the application. As a concrete example of the latter, when a user explains a bank transaction we emit a JSON payload onto a message queue. Part of this payload is shown in the following snippet:

{
 "id": 25,
 "company_id": 50,
 "explained_at": 2020-01-10T12:49:24.000+00:00,
 "transaction_id": 100,
 "nominal_code": 365
}

providing a record of who is explaining bank transactions at a given time, as well as the information required to calculate the accuracy of our auto-explain methods. The data mentioned in the above examples are processed using a separate Ruby on Rails application, which handles the Extraction of data from various sources, Transformation of this data into the required format and Loading into the appropriate data warehouse (we refer to this as the ETL application). For the above example this involves consuming the message from the queue, parsing the required attributes from the JSON and storing these in the appropriate MySQL relational database model table. 

Data is transferred between our ETL application and AWS Redshift, where it is accessible for Looker business reporting.

To explore the data and build reports for the business, we use a business intelligence tool called Looker, which reads data from our Amazon Web Services (AWS) Redshift cluster. A required final step in the ETL process is therefore to transfer data from the MySQL database of the ETL application to our Redshift cluster, and it is our approach to this step that the rest of this post describes.

The Previous Approach

In the past we’ve handled the transfer between databases by performing a full table scan, copying the data to an AWS Simple Storage Service (S3) bucket and inserting into Redshift from there. This simple approach has its advantages, such as the ease of adding new data tables and schema changes for existing tables in the ETL application database being automatically replicated in Redshift, which is particularly useful when a new, frequently changing data source is introduced. However, over time and with an increasing number of customers using FreeAgent, the volume of data stored in certain tables meant this approach was unfeasible. For these tables, loading data into Redshift took several hours, preventing up-to-date reports from being available throughout the whole working day.

Switching to Incremental

Rather than performing a full table scan each time we want to transfer data, we can instead  transfer only the additional data generated since the last successful load to Redshift. This assumes that we have some way of selecting the increment to be transferred to Redshift from the ETL application database. Sticking with the event data example, we can simply use the IDs in the JSON payload, since each event is immutable and uniquely indexed by its ID. The following steps outline a recipe for implementing incremental loading to Redshift for event data, but could easily be adapted to data indexed using a different attribute (e.g. a timestamp).

Data increment lookup table

We create a model table in the ETL application to hold the name of the model being transferred and the IDs of the last event data entries successfully loaded to Redshift. This provides a reference for where to begin loading events next time the operation is performed. Note this will initially be blank for each model, causing a full table scan to be performed. The schema of such a table, here called RedshiftSyncResult, is indicated below:

The model has methods for looking up the next ID to transfer for a particular model table and saving the last ID after a successful transfer, which are used later in the process. Each time a data increment is successfully transferred, a new row is written to this table, as indicated by the BankTransactionExplanation rows of the above table.

Looking up the data increment to transfer

Look up the last synced ID in RedshiftSyncResult (l_id)  and the current last ID in the ETL application database (c_id) for the model table being transferred. If no l_id can be found for the model, we set l_id to 0 so that all data will be transferred. The next ID to sync (n_id) is given by adding 1 to l_id. If n_id is greater than c_id there are no results to transfer, else we move onto the next step. The diagram below shows this increment selection process schematically for the BankTransactionExplanation table.

Copying to Redshift temporary table

We write out the data increment bounded by n_id and c_id to a CSV file in S3 and then copy from here to a temporary table in Redshift, building the correct schema. For the above example, the SQL code for the create and copy statements is as follows:

create table temp_bank_transaction_explanations (
  id integer,
  company_id integer,
  explained_at timestampz,
  transaction_id integer,
  nominal_code varchar(10)
)

copy temp_bank_transaction_explanations (
  id,
  company_id,
  explained_at,
  transaction_id,
  nominal_code
)

from 's3://#{s3.bucket_name}/#{key}'
credentials 'aws_iam_role=#{IAM_ROLE_FOR_LOADING}'
csv
emptyasnull
gzip

with each executed in separate transactions. This may look like we have to write down separate SQL queries for each model we load, but in practice we utilise a RedshiftUploader class to generate the required SQL for each model. The class accepts a Rails relation as an argument, which specifies the columns from the ETL application model to be considered and uses a helper method to find the appropriate Redshift data type, given a MySQL data type.

Inserting into Redshift model table

We then create the Redshift model table if it doesn’t exist and insert the data stored in the temporary table, being careful not to include duplicates. For the above example the SQL is as follows:

create table if not exists bank_transaction_explanations (
  id integer,
  company_id integer,
  explained_at timestampz,
  transaction_id integer,
  nominal_code varchar(10)
)

insert into bank_transaction_explanations (
  id,
  company_id,
  explained_at,
  transaction_id,
  nominal_code
)
select
  tbte.id,
  tbte.company_id,
  tbte.explained_at,
  tbte.transaction_id,
  tbte.nominal_code
from temp_bank_transaction_explanations
where not exists (
  select
    null
  from
    bank_transaction_explanations bte
  where
    bte.id = tbte.id
)

with the where not exists() clause responsible for removing any duplicates from the records being inserted. We again use a utility class to generate the necessary SQL for each model table being loaded. The above two steps are shown schematically in the diagram below.

The flow of data in our incremental transfer between the ETL application and Redshift.

Recording the index of the last synced entry

Save the last loaded ID to RedshiftSyncResult for the appropriate model if the loading process been successful. This provides the starting point for the next transfer of data.

The end result is that we can now transfer data increments almost instantaneously, meaning that reports are always available in Looker and we can schedule the transfers to happen much more regularly throughout the day, for example every 15 minutes instead of once per day. One possibility to capitalise on this approach is to display almost real-time behavioural data across the office, providing us with a better connection between our work and what FreeAgent customers are doing in the real world.