Load Data from Amazon Web Services (AWS) S3

SingleStore Pipelines can extract objects from Amazon S3 buckets, optionally transform them, and insert them into a destination table. To understand Amazon S3’s core concepts and the terminology used in this topic, please read the Amazon S3 documentation.

Prerequisites

To complete this Quickstart, your environment must meet the following prerequisites:

  • AWS Account: This Quickstart uses Amazon S3 and requires an AWS account’s access key id and secret access key.

  • SingleStore installation –or– a SingleStore cluster: You will connect to the database or cluster and create a pipeline to pull data from your Amazon S3 bucket.

Part 1: Creating an Amazon S3 Bucket and Adding a File

  1. On your local machine, create a text file with the following CSV contents and name it books.txt:

    The Catcher in the Rye, J.D. Salinger, 1945
    Pride and Prejudice, Jane Austen, 1813
    Of Mice and Men, John Steinbeck, 1937
    Frankenstein, Mary Shelley, 1818
  2. In S3 create a bucket and upload books.txt to the bucket. For information on working with S3, refer to the Amazon S3 documentation.

    Note that the aws_access_key_id that your SingleStore pipeline will use (specified in the next section in CREATE PIPELINE library ... CREDENTIALS ...) must have read access to both the bucket and the file.

Once the books.txt file has been uploaded, you can proceed to the next part of the Quickstart.

Part 2: Generating AWS Credentials

To be able to use an S3 bucket within the pipeline syntax, the following minimum permissions are required:

  • s3:GetObject

  • s3:ListBucket

These permissions only provide for read access from an S3 bucket which is the minimum required to ingest data into a pipeline.

There are two ways to create an IAM Policy: with the Visual editor or JSON. Both creation methods require obtaining the Amazon Resource Number (ARN) before the policy is created.

Create an IAM Policy Using the Visual Editor

  1. Log into the AWS Management Console.

  2. Obtain the Amazon Resource Number (ARN) and region for the bucket. The ARN and region are located in the Properties tab of the bucket.

  3. Select IAM from the list of services.

  4. Click on Policies under Access Management and click the Create policy button.

  5. Using the Visual editor:

    1. Click on the Service link and select S3 from the list or manually enter S3 into the search block.

    2. Click on the S3 link from the available selections.

    3. In the Action section, click the List and Read checkboxes.

    4. Under Resources, click the bucket link and click on the Add ARN link. Enter the ARN and bucket name and click the Add button.

    5. Request conditions are optional.

Create an IAM Policy Using JSON

  1. To use JSON for policy creation, copy the information in the code block below into the AWS JSON tab. Make sure to change the bucket name.

    {
    "Version": "2012-10-17",
    "Statement": [{
    "Sid": "VisualEditor1",
    "Effect": "Allow",
    "Action": [
    "s3:GetObject",
    "s3:ListBucket"
    ],
    "Resource": [
    "arn:aws:s3:::<bucket_name>",
    "arn:aws:s3:::<bucket_name>/*"
    ],
    "Conditions": {
    "StringNotEquals": {
    "aws:SourceVpce": "<vpc - id>"
    }
    }
    }]
    }
  2. Click the Add tag button if needed and click Next: Review.

  3. Enter a policy name this is a required field. The description field is optional. Click Create policy to finish.

Assign the IAM Policy to a New User

  1. In the IAM services, click on Users and click the Add users button.

  2. Enter in a name for the new user and click Next.

  3. Select the Attach policies directly radio button. Use the search box to find the policy or scroll through the list of available policies.

  4. Click the checkbox next to the policy to be applied to the user and click Next.

  5. Click the Create user button to finish.

Create Access Keys for Pipeline Syntax

Access keys will need to be generated for the newly created user.

  1. In the IAM services, click on Users and click the newly created user name.

  2. Click on the Security credentials tab.

  3. In the access keys section, click on the Create access key button.

  4. Click on the Third-party service radio button and click Next.

  5. Although setting a description tag is optional, SingleStore recommends especially when multiple keys are needed. Click the Create key button to continue.

  6. At the final screen there will be an option to download a .csv file with the access and secret key access information or to copy the information. Click Done when finished.

  7. Below is the basic syntax for using an access key and a secret access key in a pipeline:

    CREATE PIPELINE <pipeline_name> AS
    LOAD DATA S3 's3://bucket_name/<file_name>'
    CONFIG '{"region":"us-west-2"}'
    CREDENTIALS '{"aws_access_key_id": "<access key id>",
    "aws_secret_access_key": "<access_secret_key>"}'
    INTO TABLE <destination_table>
    FIELDS TERMINATED BY ',';

Warning

If the key information is not downloaded or copied to a secure location before clicking Done, the secret key cannot be retrieved, and will need to be recreated.

Part 3: Creating a SingleStore Database and S3 Pipeline

Now that you have an S3 bucket that contains an object (file), you can use SingleStore or DB to create a new pipeline and ingest the messages.

Create a new database and a table that adheres to the schema contained in the books.txt file. At the prompt, execute the following statements:

CREATE DATABASE books;
CREATE TABLE classic_books
(
title VARCHAR(255),
author VARCHAR(255),
date VARCHAR(255)
);

These statements create a new database named books and a new table named classic_books, which has three columns: title, author, and date.

Now that the destination database and table have been created, you can create an S3 pipeline. In Part 1 of this Quickstart, you uploaded the books.txt file to your bucket. To create the pipeline, you will need the following information:

  • The name of the bucket, such as: <bucket-name>

  • The name of the bucket’s region, such as: us-west-1

  • Your AWS account’s access keys, such as:

    • Access Key ID: <aws_access_key_id>

    • Secret Access Key: <aws_secret_access_key>

  • Your AWS account's session token, such as:

    • Session Token: your_session_token

    • Note that the aws_session_token is required only if your credentials in the CREDENTIALS clause are temporary

Using these identifiers and keys, execute the following statement, replacing the placeholder values with your own.

CREATE PIPELINE library
AS LOAD DATA S3 'my-bucket-name'
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key", "aws_session_token": "your_session_token"}'
INTO TABLE `classic_books`
FIELDS TERMINATED BY ',';

You can see what files the pipeline wants to load by running the following:

SELECT * FROM information_schema.PIPELINES_FILES;

If everything is properly configured, you should see one row in the Unloaded state, corresponding to books.txt. The CREATE PIPELINE statement creates a new pipeline named library, but the pipeline has not yet been started, and no data has been loaded. A SingleStore pipeline can run either in the background or be triggered by a foreground query. Start it in the foreground first.

START PIPELINE library FOREGROUND;

When this command returns success, all files from your bucket will be loaded. If you check information_schema.PIPELINES_FILES again, you should see all files in the Loaded state. Now query the classic_books table to make sure the data has actually loaded.

SELECT * FROM classic_books;
+------------------------+-----------------+-------+
| title                  | author          | date  |
+------------------------+-----------------+-------+
| The Catcher in the Rye |  J.D. Salinger  |  1945 |
| Pride and Prejudice    |  Jane Austen    |  1813 |
| Of Mice and Men        |  John Steinbeck |  1937 |
| Frankenstein           |  Mary Shelley   |  1818 |
+------------------------+-----------------+-------+

You can also have SingleStore run your pipeline in the background. In such a configuration, SingleStore will periodically poll S3 for new files and continuously them as they are added to the bucket. Before running your pipeline in the background, you must reset the state of the pipeline and the table.

DELETE FROM classic_books;
ALTER PIPELINE library SET OFFSETS EARLIEST;

The first command deletes all rows from the target table. The second causes the pipeline to start from the beginning, in this case, forgetting it already loaded books.txt so you can load it again. You can also drop and recreate the pipeline, if you prefer.

To start a pipeline in the background, run

START PIPELINE library;

This statement starts the pipeline. To see whether the pipeline is running, run SHOW PIPELINES.

SHOW PIPELINES;
+----------------------+---------+
| Pipelines_in_books   | State   |
+----------------------+---------+
| library              | Running |
+----------------------+---------+

At this point, the pipeline is running and the contents of the books.txt file should once again be present in the classic_books table.

Note

Foreground pipelines and background pipelines have different intended uses and behave differently. For more information, see the START PIPELINE topic.

Load Data Using pipeline_source_file()

Pipelines can extract, transform, and insert objects from an Amazon S3 bucket into a destination table. Pipelines can persist the name of a file by using the pipeline_source_file() helper.

Below is a list of three files that will be loaded into a table using an S3 pipeline. These files have a numeric column and one numeric entry per line which corresponds to a filename.

CREATE TABLE book_inventory(isbn NUMERIC(13),title VARCHAR(50));

Create an S3 pipeline to ingest the data.

CREATE PIPELINE books AS
LOAD DATA S3 's3://<bucket_name>/Books/'
CONFIG '{"region":"us-west-2"}
'CREDENTIALS '{"aws_access_key_id": "<access_key_id>",
"aws_secret_access_key": "<secret_access_key>"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE book_inventory
(isbn)
SET title = pipeline_source_file();

Test the pipeline:

TEST PIPELINE books limit 5;
+---------------+------------------+
| isbn          | title            |
+---------------+------------------+
| 9780770437404 | Books/Horror.csv |
| 9780380977277 | Books/Horror.csv |
| 9780385319676 | Books/Horror.csv |
| 9781416552963 | Books/Horror.csv |
| 9780316362269 | Books/Horror.csv |
+---------------+------------------+

Start the pipeline.

START PIPELINE books;

Check each row to verify that every one has a corresponding filename.

SELECT * FROM book_inventory;
+---------------+---------------------------+
| isbn          | title                     |
+---------------+---------------------------+
| 9780316137492 | Books/Nautical.csv        |
| 9780440117377 | Books/Horror.csv          |
| 9780297866374 | Books/Nautical.csv        |
| 9780006166269 | Books/Nautical.csv        |
| 9780721405971 | Books/Nautical.csv        |
| 9781416552963 | Books/Horror.csv          |
| 9780316362269 | Books/Horror.csv          |
| 9783104026886 | Books/Nautical.csv        |
| 9788496957879 | Books/Nautical.csv        |
| 9780380783601 | Books/Horror.csv          |
| 9780380973835 | Books/science_fiction.csv |
| 9780739462287 | Books/science_fiction.csv |
+---------------+---------------------------+

To load files from a specific folder in your S3 bucket while ignoring the files in the subfolders, use the '**' regular expression pattern as 's3://<bucket_name>/<folder_name>/**'. For example:

CREATE PIPELINE <your_pipeline> AS
LOAD DATA S3 's3://<bucket_name>/<folder_name>/**'
CONFIG '{"region":"<your_region>"}'
CREDENTIALS '{"aws_access_key_id": "<access_key_id>",  
              "aws_secret_access_key": "<secret_access_key>"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE <your_table>;

Using two asterisks (**) after the folder instructs the pipeline to load all of the files in the main folder and ignore the files in the subfolders. However, the files in the subfolders will get scanned when listing the contents of the bucket.

Next Steps

See Load Data with Pipelines to learn more about how pipelines work.

Last modified: July 4, 2024

Was this article helpful?