Combining data from different sources with SageMaker pipelines

Posted by on August 2, 2023

Generating datasets for machine learning

Preparing data and generating datasets is a crucial step to train a machine learning model. If you are lucky your data might come from a single .csv file. However in most cases pulling together the input features to train your machine learning model will require combining datasets from different sources. Combining data from different sources manually can be a time consuming process, prone to errors.  

At FreeAgent, we know that training our machine learning models regularly with additional more recent data is key to maintaining the model performance. With this in mind we decided to find a solution to automate the generation of files to train and evaluate the model. 

In this post we will describe how we can use a SageMaker pipeline to combine files from different sources. We will show how to use a SageMaker processing job to query data in Athena and Redshift and combine the query outputs with other files saved in S3 to generate training and evaluation datasets.

When we worked on this project we found it difficult to find documentation to help us build this pipeline. We hope that the following example will be useful to someone else and please do not hesitate to let us know of any comments or questions.

SageMaker pipelines

SageMaker pipelines are a feature supported by Amazon Sagemaker that allows the creation of end-to-end machine learning workflows, which are composed of steps. 

We already used a SageMaker pipeline to train our model, it made sense to use a similar approach to create our datasets.

Workflow

The purpose of this pipeline is outlined in the workflow below. In our case we needed to build a proof of concept model by combining data from three different sources:

  • Customer attributes stored in CSV format in S3
  • Standard FreeAgent feature usage data from our Redshift data warehouse
  • Highly nested data returned by a third party queried by using Athena

The outputs of the queries will be saved in S3. Using a Python script we would then combine the query outputs with the additional CSV files already in S3. This script is run in a processing step, a type of step which can be used in a SageMaker pipeline workflow to run a data processing job. The combined files will then be split into training and evaluation datasets which will be saved in a specific location in S3.

Setting up the query strings

When selecting specific features for a machine learning problem, queries can be very long and impractical to hard-code in the SageMaker pipeline. An alternative is to save the Athena and the Redshift queries in your project in an athena.sql and redshift.sql file respectively and use the following function to read them into a variable.

def read_query_string_from_file(file_path): 
    with open(file_path, "r") as sql_file:
        query=sql_file.read()
    return query

Each query string can be read using the appropriate formatting. Note that the formatting for the query in Redshift was particularly sensitive to single and double quotes. If your query is not formatted correctly, your pipeline would fail. If possible, we would recommend testing your query directly in the Redshift query editor before testing your pipeline.

athena_query_string=read_query_string_from_file("query.athena.sql")

redshift_query_string=read_query_string_from_file(
    "query.redshift.sql"
).replace("'", "\\'")

Configuring the DatasetDefinition

We mentioned before that we will process the data with a processing step. The inputs of the processing step are one of either an input file loaded from S3 (S3Input) or DatasetDefinition types. The DatasetDefinition types support data sources which can be queried via Athena and Redshift. 

AthenaDatasetDefinition

We can now use the query string above in a AthenaDatasetDefinition input to the pipeline. The AthenaDatasetDefinition has the following required attributes: 

  • Catalog: the name of the catalog used in the Athena query execution
  • Database: the name of the database used in the Athena query execution
  • OutputFormat: the data storage format for the Athena query results
  • OutputS3Uri: the location in Amazon S3 where the Athena query results will be stored
  • QueryString: the SQL query statement to be executed

The Athena dataset definition does not support CSV as an output format. In this example we chose to select the `TEXTFILE` OutputFormat where the outputs are saved as compressed unicode-encoded files.

from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition

def athena_query_output_dataset():
    return AthenaDatasetDefinition(
        catalog="data_catalog",
        database="database_name",
        query_string=athena_query_string,
        output_s3_uri="s3_uri_query_output_location",
        output_format="TEXTFILE",
    )

RedshiftDatasetDefinition

We can also use the query string above in a RedshiftDatasetDefinition input. The RedshiftDatasetDefinition has a following required attributes: 

  • ClusterId: the Redshift cluster identifier
  • ClusterRoleArn: the IAM role attached to your Redshift cluster that Amazon SageMaker uses to generate datasets
  • Database: the name of the Redshift database used in the Redshift query execution
  • DbUser: the database user name used in the Redshift query execution
  • OutputFormat: the data storage format for the Redshift query results
  • OutputS3Uri: the location in Amazon S3 where the Redshift query results will be stored
  • QueryString: the SQL query statement to be executed

You also need to make sure that your db_user has permission to query the table in Redshift.

from sagemaker.dataset_definition.inputs import RedshiftDatasetDefinition

def redshift_query_output_dataset():
    return RedshiftDatasetDefinition(
        cluster_id="cluster_id", 
        cluster_role_arn="cluster_role_arn",
        database="database_name", 		   
        db_user="user",
        query_string=redshift_query_string,
        output_s3_uri="s3_uri_query_output_location",
        output_format="CSV",
    )

Putting everything together in the processing step

Once we have configured the Athena and Redshift DatasetDefinition we can create a processing step. To create a processing step we need a processor to define the environment our processing script should be run in (such as the container and the type of instance). In the example below we show how to use a SKLearnProcessor, which allows you to create a processing job with scikit-learn and its dependencies available to use. There is also the possibility of customising your processor with your own Docker image (with your specific dependencies) using the sagemaker.processing ScriptProcessor.

from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1",
    role="role_arn",
    instance_type="ml.t3.medium",
    instance_count=1, 
    command=["python"],
    base_job_name="generate_datasets",
)

We can then define the inputs and outputs of our processing job as well as the python code which will be used to combine the inputs and split them into the training and evaluation dataset outputs.

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import DatasetDefinition
from sagemaker.workflow.steps import ProcessingStep

def generate_training_and_evaluation_datasets():
    return ProcessingStep(
        name="generate-datasets",
        processor=sklearn_processor,
        inputs=[
            ProcessingInput(
                source="file_s3_uri",
                destination="location_on_container",
                input_name="file_input",
            ),
            ProcessingInput(
                input_name="athena_input",
                dataset_definition=DatasetDefinition(
                    local_path="location_on_container", 
                    athena_dataset_definition=athena_query_output_dataset(),
                ),
                destination="location_on_container",
            ),
            ProcessingInput(
                input_name="redshift_input",
                dataset_definition=DatasetDefinition(
                    local_path="location_on_container",
                    redshift_dataset_definition=redshift_query_output_dataset(),
                ),
                destination="location_on_container",
            ),
        ],
        outputs=[
            ProcessingOutput(
                source="location_on_container",
                destination="s3_uri",
                output_name="training_data",
            ),
            ProcessingOutput(
                source="location_on_container",
                destination="s3_uri",
                output_name="evaluation_data",
            ),
        ],
        code="generate_training_and_evaluation_datasets.py",
    )

The query outputs are generated in multiple files (compressed in the case of the Athena query) and without column headers. We added some functionality as part of the python script to combine all the outputs (or compressed output) in a given location into a single pandas DataFrame with the column names matching the feature fields extracted in each query. 

The rest of the python script joined the various DataFrames and split the combined inputs into the training and evaluation datasets.

Running the pipeline

All that remained was to create the pipeline itself to run the processing step and build the pipeline.

def build_pipeline():
    return Pipeline(
        name="data-pipeline",
        pipeline_experiment_config=PipelineExperimentConfig(
            "experiment_name", 
            "trial_name",
        ),
        steps=[generate_training_and_evaluation_datasets()],
    )

Summary

We successfully implemented a SageMaker pipeline which automatically ran queries in Redshift and Athena and combined the query outputs with files located in S3 to generate training and evaluation datasets for our classification model. 

We now have the flexibility to add functionality to run the data generation pipeline and our model training pipeline back to back. 

The pipeline currently only comprises a single step and could be simplified to a single ProcessingStep. However, we chose to use a SageMaker pipeline so we had the flexibility to add other steps to our workflow in the future, such as data quality checks on the datasets generated. 

As mentioned in the introduction, when we worked on this project we found it difficult to find documentation to help us build this pipeline. We hope that it will be useful to someone else and please do not hesitate to let us know of any comments or questions.

Leave a reply

Your email address will not be published. Required fields are marked *