Load Data from HDFS Using a Pipeline
On this page
When you use HDFS pipelines, you extract data from an HDFS file path, optionally transform the data, and load it to a SingleStore table.
With Enabling Wire Encryption and Kerberos on HDFS Pipelines, you can encrypt your pipeline’s connection to HDFS and you can authenticate your pipeline using Kerberos.
This topic assumes that you have set up HDFS in your cluster and you are familiar with how HDFS works.
Note
HDFS Pipelines cannot run Hadoop jobs.
Creating and Starting an HDFS Pipeline
You create an HDFS pipeline by running the CREATE PIPELINE statement.
HDFS Pipeline Scenario
Imagine that your organization has numerous applications running on-premises.
Your goal: Get a running weekly and monthly count of the number of errors per application.
To accomplish this, you follow these steps sequentially.
-
Copy the first day’s application log files into HDFS.
-
Run a Hadoop job that processes the log files and outputs the results to one output file.
-
Use an HDFS pipeline to extract the results and import them into a SingleStore table.
-
Confirm that the SingleStore table contains the data for the first day.
If the data is correct, run steps one through three continuously, at the end of every day.
Note
This scenario provides a hypothetical example of how you can use HDFS pipelines.
Log File Format
Shown below is an example application log file containing selected errors from one day..
parts indicate lengthy sections containing errors that are removed for brevity.
...
[2019-03-01 09:30:08]: ERR-2030: File not found: file1.txt
...
[2019-03-01 12:15:35]: ERR-1010: Could not read configuration file conf_file.txt
...
[2019-03-01 14:00:10]: ERR-1520: Not enough memory available to open file file2.txt
...
[2019-03-01 16:40:35]: ERR-1010: Could not read configuration file conf_file10.txt
...
[2019-03-01 19:20:55]: ERR-1520: Not enough memory available to open file file15.txt
...
Your other applications generate log files that are formatted in a similar way.
HDFS folder Setup
The HDFS folder /user/memsql/<application name>
stores the input files, per application, that are processed by your Hadoop job.part-00000
file to the HDFS folder /user/memsql/output
.part-00000
filename has the current date appended to the end.
Running the Hadoop Job
Run the Hadoop job which extracts the error date and error code for each log entry.
An example output file is shown below.
App1, ERR-2030, 2019-03-01
App1, ERR-1010, 2019-03-01
App1, ERR-1520, 2019-03-01
App1, ERR-1010, 2019-03-01
App1, ERR-1520, 2019-03-01
App2, E-400, 2019-03-01
App2, E-250, 2019-03-01
App2, E-800, 2019-03-01
App2, E-400, 2019-03-01
Creating the Destination Table
Create the table where the data will be loaded from the pipeline.
CREATE TABLE app_errors (app_name TEXT, error_code TEXT, error_date DATE, SORT KEY (error_date));
The reason you use a columnstore table to store the application errors is because a columnstore is well suited for performing aggregate queries.
Creating Your Pipeline
Use the following statement to create a new pipeline named my_
, where you reference the HDFS path /memsql/output/
as the data source, and import data into the app_
table once the pipeline is started.
CREATE PIPELINE my_pipeline ASLOAD DATA HDFS 'hdfs://hadoop-namenode:8020/memsql/output/'INTO TABLE app_errorsFIELDS TERMINATED BY ', ';
Should your CREATE PIPELINE
statement fail, run SHOW WARNINGS
.CREATE PIPELINE
statement failed.
Testing Your Pipeline
After you run your CREATE PIPELINE
statement successfully, run the tests in this section to confirm your pipeline is working.
The following query should return one row for each file that the pipeline has imported.FILE_
could have the value part-00000-20190101
.FILE_
field value should have the value Unloaded
.
SELECT * FROM information_schema.pipelines_files WHERE pipeline_name = 'my_pipeline';
Run the following command to test if the master aggregator can connect to the HDFS namenode.
TEST PIPELINE my_pipeline LIMIT 0;
Run the following command to test if the SingleStore leaf nodes can connect to the HDFS datanodes.
TEST PIPELINE my_pipeline LIMIT 1;
Starting Your Pipeline
Assuming the tests you ran in the previous section succeeded, start your pipeline in the foreground:
START PIPELINE my_pipeline FOREGROUND LIMIT 1 BATCHES;
Starting your pipeline in the foreground allows you to see any errors, if they occur.app_
table, your pipeline stops.
Run the following query, which should return one row with the part-00000-20190101
file.FILE_
field should have the value Loaded
, assuming no errors occurred when you started the pipeline.
SELECT * FROM information_schema.pipelines_files WHERE pipeline_name = 'my_pipeline';
Run SELECT * FROM app_
to view the records in the app_
table./user/memsql/output/part-00000-20190101
contains the output as shown in the Running the Hadoop Job above, you will see nine records, each with an app_
, error_
and error_
field.
Syncing HDFS with Your Application Logs
Now that your Hadoop job generates an output file successfully and your pipeline imports the file successfully, you want to periodically copy your application log files to the /user/memsql/<application name>
HDFS folders.
After you start the script, start your pipeline in the background to continuously ingest data into the app_
table.
START PIPELINE my_pipeline;
Note
Foreground pipelines and background pipelines have different intended uses and behave differently.
Finding the Weekly and Monthly Error Count
Recall that your goal is to find a running weekly and monthly count of the number of errors each application generates.
SELECT COUNT(*), app_name, error_code FROM app_errorsGROUP BY app_name, error_code
Using the previous query as a starting point, you write the monthly query:
SELECT COUNT(*), app_name, error_code, MONTH(error_date) FROM app_errorsGROUP BY app_name, error_code, MONTH(error_date)
Finally, you write the weekly query, using WEEK(error_
to specify that the week begins on Sunday:
SELECT COUNT(*), app_name, error_code, WEEK(error_date, 2) FROM app_errorsGROUP BY app_name, error_code, WEEK(error_date, 2)
Next Steps
See About SingleStore Pipelines to learn more about how pipelines work.
In this section
Last modified: July 24, 2025