Data is at the heart of our business. We use data to make business critical decisions on a daily basis. It is important that this data is not only accurate but also available when required. Traditionally reports would be generated at a set schedule which made it difficult to decide on next steps in a timely fashion. New technologies like Amazon Kinesis Data Streams enable us to generate these reports in near real time.
In this blog post we describe the software system responsible for processing our event data and making it available for visualisations and reports in our business intelligence tool. We will also look at the positive impact that AWS managed services can have to lower the burden on our operations team.
To efficiently write event data into Redshift we have built a number of services, each with a very specific task:
- Producer – This application receives event data via HTTP and writes the data to an Amazon Kinesis data stream using the Kinesis Producer Library (KPL).
- Consumer – The consumer application is a stream processing service that turns events into smaller batches that can be imported independently.
- Importer – The importer is responsible for copying batches of events from S3 into Redshift.
A fault in any one of these services must not impact the entire pipeline. A number of supporting AWS services and external services monitor the health of this system:
- CloudWatch Logs – Our logs are stored in CloudWatch and can easily be accessed by our engineers.
- CloudWatch Metrics – Application performance and overall health of the system is tracked using CloudWatch.
- CloudWatch Alarms – Breaching thresholds or unusual variance in these metrics is constantly monitored and will trigger an appropriate action to make us aware of any change (e.g. Slack or PagerDuty notification).
- Rollbar – Application errors are all tracked in a central place using Rollbar.
Below is a diagram that outlines the architecture.
The main entrypoint of this software system is our Producer. The producer is a Java application that runs a Vert.x web server to receive webhooks from both external and internal systems. Vert.x has shown significant performance characteristics in benchmarks like the TechEmpower Framework Benchmark.
Incoming events are first validated and then handed over to the Kinesis Producer Library (KPL) which is provided by Amazon Web Services (AWS). The KPL solves a number of challenging problems when dealing with data streams and specifically with Amazon Kinesis. Some of the benefits of using the KPL are:
- Record aggregation for better performance and cost optimisations
- Publishing of metrics for monitoring
- Efficient batching of records with retry logic
We decided to keep all our events in JSON format for readability and flexibility when it comes to publishing events. In the future we would like to explore some alternatives that could enforce a schema and perhaps reduce the size of our events to optimise for cost.
We monitor the request rate, error rate and latency as some of the key indicators for application performance.
Generating Micro Batches
In the Consumer application we make use of another library offered by AWS, the Kinesis Consumer Library (KCL). Like the KPL, the KCL deals with all the complex and difficult tasks like record deaggregation, metric publication and load balancing across different workers when using multiple partitions.
It is worth noting that this application is written in Java. Whilst there is support for other languages to use the KCL, we found that the support and performance of using the KCL directly is far superior to using the MultiLang Daemon.
Events received from the Kinesis stream are converted into facts and dimensions which are easily queryable from Redshift. These facts and dimensions are buffered into individual chunks of CSV files as shown in the image below.
We write this data to S3 along with some metadata (the target table and the columns included in the file) on a set schedule or before the buffers get too big to fit in memory. We checkpoint our position in the stream when we have successfully flushed the buffers. This is important to avoid duplicate records in our data warehouse.
If you have used Kinesis Firehose before, you will notice that this concept sounds very similar and begs the question: Why have we written an application that does exactly that? One drawback of the Kinesis Firehose that we found is the fact that a Firehose can only target a single Redshift table at a time. That makes it very inflexible for our use case as we would like to support 100s of event types in the future, all with very different payloads and therefore columns. Complex record transformations are also easier to deal with in a general purpose language like Java.
At this point our event throughput is low enough that we don’t need to compress these CSV files. As our throughput increases we would like to explore the different compression algorithms which are supported by Redshift and see how this impacts copy performance and upload latency.
The health of the application can be monitored by checking the processing lag. Fortunately, the KCL publishes application level metrics like MillisBehindLatest that can be a good indication if your pipeline is slowing down or has stopped entirely.
Once a buffer is successfully written to S3 we use S3 bucket notifications in conjunction with SQS to finally import data into our Redshift cluster. We have agreed on a convention where all data loads are written to the loads/ prefix. All data unloads (using the UNLOAD command) are written to the unloads/ prefix.
The Importer continuously receives messages from the SQS queue with information about new files to be imported. The attached metadata is used to determine the correct Redshift table and which columns are included in the CSV file. This service is configured to always run exactly one service at a time on our ECS cluster. During application startup we perform any necessary schema changes which are checked into our source control system. These schema changes are tested in a staging environment before they are applied in our production environment. We create regular backups of our data in Redshift to mitigate the impact of cluster failure.
We issue a COPY command in order to import data into Redshift. This is the recommended and most efficient way to import data into Redshift.
On the rare occasion when an import fails due to intermittent connectivity issues or during cluster maintenance we use an exponential backoff strategy in our application to retry any failed attempts to import data. Our queue is configured to send any message that couldn’t be successfully processed after a certain number of attempts to a special dead letter queue which is monitored via CloudWatch. This prevents us from blocking the pipeline if a corrupt file is received. Thanks to the configured monitoring we can start investigating failed imports without losing the message.
Visualisations & Reports
With the data available in Redshift, we can start to build models to represent this information. We use Looker to generate rich visualisations and reports from our event data. These reports can be generated and retrieved in a self service fashion which is very positively received by the rest of the business.
We will publish a blog post in the near future about how we use Looker to generate visualisations and reports for our business needs. Watch this space!
The solution described above allows us to efficiently move event data into our data warehouse with minimum latency (time for event data to be published before available in Redshift). With at most one minute of latency we can generate rich visualisations and reports based on live data. The solution can also cope with most faults that we can anticipate at this point without losing any data. The use of managed services, where appropriate, makes the maintenance of this system very easy.
If these sorts of challenges interest you and you are passionate about building data-centric applications then check out our careers page. We are hiring into our Data Platform team.