Load Data from HDFS Using a Pipeline

When you use HDFS pipelines, you extract data from an HDFS file path, optionally transform the data, and load it to a SingleStore table.

With Enabling Wire Encryption and Kerberos on HDFS Pipelines, you can encrypt your pipeline’s connection to HDFS and you can authenticate your pipeline using Kerberos. SingleStore supports Data Transfer Protocol (DTP), which encrypts your pipeline’s connection to HDFS.

This topic assumes that you have set up HDFS in your cluster and you are familiar with how HDFS works.

Note

HDFS Pipelines cannot run Hadoop jobs.

Creating and Starting an HDFS Pipeline

You create an HDFS pipeline by running the CREATE PIPELINE statement. You start a pipeline by running START PIPELINE

HDFS Pipeline Scenario

Imagine that your organization has numerous applications running on-premises. These applications generate lengthy log files that contain possible errors the applications generate as they run.

Your goal: Get a running weekly and monthly count of the number of errors per application.

To accomplish this, you follow these steps sequentially. The steps are explained in detail in the sections below.

  1. Copy the first day’s application log files into HDFS.

  2. Run a Hadoop job that processes the log files and outputs the results to one output file.

  3. Use an HDFS pipeline to extract the results and import them into a SingleStore table.

  4. Confirm that the SingleStore table contains the data for the first day. If the data is correct, run steps one through three continuously, at the end of every day.

Note

This scenario provides a hypothetical example of how you can use HDFS pipelines. It is a not a working example, as the application logs and the Hadoop job are not available.

Log File Format

Shown below is an example application log file containing selected errors from one day. The ... parts indicate lengthy sections containing errors that are removed for brevity.

...
[2019-03-01 09:30:08]: ERR-2030: File not found: file1.txt
...
[2019-03-01 12:15:35]: ERR-1010: Could not read configuration file conf_file.txt
...
[2019-03-01 14:00:10]: ERR-1520: Not enough memory available to open file file2.txt
...
[2019-03-01 16:40:35]: ERR-1010: Could not read configuration file conf_file10.txt
...
[2019-03-01 19:20:55]: ERR-1520: Not enough memory available to open file file15.txt
...

Your other applications generate log files that are formatted in a similar way. The errors contain the date and time of the error, the error code and the error message.

HDFS folder Setup

The HDFS folder /user/memsql/<application name> stores the input files, per application, that are processed by your Hadoop job. After your job runs, it outputs the resulting part-00000 file to the HDFS folder /user/memsql/output. The part-00000 filename has the current date appended to the end.

Running the Hadoop Job

Run the Hadoop job which extracts the error date and error code for each log entry. The job will write these fields, along with the application name, to a line in the output file.

An example output file is shown below.

App1, ERR-2030, 2019-03-01
App1, ERR-1010, 2019-03-01
App1, ERR-1520, 2019-03-01
App1, ERR-1010, 2019-03-01
App1, ERR-1520, 2019-03-01
App2, E-400, 2019-03-01
App2, E-250, 2019-03-01
App2, E-800, 2019-03-01
App2, E-400, 2019-03-01

Creating the Destination Table

Create the table where the data will be loaded from the pipeline.

CREATE TABLE app_errors (app_name TEXT, error_code TEXT, error_date DATE, SORT KEY (error_date));

The reason you use a columnstore table to store the application errors is because a columnstore is well suited for performing aggregate queries. Also, you will not be updating rows in the table once they are imported.

Creating Your Pipeline

Use the following statement to create a new pipeline named my_pipeline, where you reference the HDFS path /memsql/output/ as the data source, and import data into the app_errors table once the pipeline is started.

CREATE PIPELINE my_pipeline AS
LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/memsql/output/'
INTO TABLE app_errors
FIELDS TERMINATED BY ', ';

Should your CREATE PIPELINE statement fail, run SHOW WARNINGS. This command may provide information as to why the CREATE PIPELINE statement failed.

Testing Your Pipeline

After you run your CREATE PIPELINE statement successfully, run the tests in this section to confirm your pipeline is working.

The following query should return one row for each file that the pipeline has imported. In this case, the query will only return one row, since the Hadoop job generates one output file. For example, if you ran the job on January 1, 2019, FILE_NAME could have the value part-00000-20190101. The FILE_STATE field value should have the value Unloaded.

SELECT * FROM information_schema.pipelines_files WHERE pipeline_name = 'my_pipeline';

Run the following command to test if the master aggregator can connect to the HDFS namenode. If the test is successful, zero rows will be returned.

TEST PIPELINE my_pipeline LIMIT 0;

Run the following command to test if the SingleStore leaf nodes can connect to the HDFS datanodes. If the test is successful, one row will be returned.

TEST PIPELINE my_pipeline LIMIT 1;

Starting Your Pipeline

Assuming the tests you ran in the previous section succeeded, start your pipeline in the foreground:

START PIPELINE my_pipeline FOREGROUND LIMIT 1 BATCHES;

Starting your pipeline in the foreground allows you to see any errors, if they occur. After your pipelines ingests one batch of data into the app_errors table, your pipeline stops.

Run the following query, which should return one row with the part-00000-20190101 file. In this row, the FILE_STATE field should have the value Loaded, assuming no errors occurred when you started the pipeline.

SELECT * FROM information_schema.pipelines_files WHERE pipeline_name = 'my_pipeline';

Run SELECT * FROM app_errors; to view the records in the app_errors table. If your HDFS file /user/memsql/output/part-00000-20190101 contains the output as shown in the Running the Hadoop Job above, you will see nine records, each with an app_name, error_code and error_date field.

Syncing HDFS with Your Application Logs

Now that your Hadoop job generates an output file successfully and your pipeline imports the file successfully, you want to periodically copy your application log files to the /user/memsql/<application name> HDFS folders. To accomplish this, you write a script to automatically copy the files at the end of every day.

After you start the script, start your pipeline in the background to continuously ingest data into the app_errors table.

START PIPELINE my_pipeline;

Note

Foreground pipelines and background pipelines have different intended uses and behave differently. For more information, see START PIPELINE.

Finding the Weekly and Monthly Error Count

Recall that your goal is to find a running weekly and monthly count of the number of errors each application generates. Prior to creating a weekly and monthly query that returns these counts, you write a simpler query that returns the number of errors per application and error code, for all records in the table.

SELECT COUNT(*), app_name, error_code FROM app_errors
GROUP BY app_name, error_code

Using the previous query as a starting point, you write the monthly query:

SELECT COUNT(*), app_name, error_code, MONTH(error_date) FROM app_errors
GROUP BY app_name, error_code, MONTH(error_date)

Finally, you write the weekly query, using WEEK(error_date, 2) to specify that the week begins on Sunday:

SELECT COUNT(*), app_name, error_code, WEEK(error_date, 2) FROM app_errors
GROUP BY app_name, error_code, WEEK(error_date, 2)

Using Kerberos Cache Support

The Kerberos cache file, also referred to as the credential cache or ticket cache, is a file that stores the Kerberos authentication tickets obtained by a client during the authentication process. This cache file is typically used by Kerberos clients to store and manage the client's credentials for future authentication requests.

SingleStore supports using the Kerberos cache to reduce the number of requests to the Kerberos servers. By default, it creates the cache in /tmp/s2_krb5cc file, however, that can be changed by editing the pipeline configuration. Also, the ability to reuse Hadoop logins was implemented, which further reduces the amount of requests.

Begin by setting advanced_hdfs_pipelines to ON,  which is required to access Kerberized HDFS.

To enable reusing Hadoop logins, add the following lines to the CONFIG section JSON:

"allow_unknown_configs": true,
"kerberos.allow_login_reuse": true

It works best when all HDFS pipelines are using the same config.

Edit the CREATE PIPELINE syntax to modify the configuration for an existing pipeline. This will keep the information about already ingested files, unlike when creating a new pipeline.

Setting up caching is a little more complicated. Do the following:

  1. Install the Kerberos CLI client tools on all nodes.

  2. On each node, set up the /etc/krb5.conf file for the Kerberos server that is associated with the Hadoop cluster. The kinit -kt KEYTAB_FILE HADOOP_USER command should work with the keytab file and the Hadoop user used in a HDFS pipeline.

  3. Modify the pipeline config for it to include:

    "allow_unknown_configs": true,
    "kerberos.use_cache": true

Alternatively, a CREATE PIPELINE query may be used with the same definitions as in the initial CREATE but with a modified CONFIG section.

Some additional settings are available for Kerberos cache:

  • "kerberos.kinit_path": Can be used to set the path to the kinit executable, if it is not in $PATH.

  • "kerberos.cache_path": Path for the Kerberos cache, which is /tmp/s2_krb5cc by default.

  • "kerberos.cache_renewal_interval": Renewal interval for the cache, in seconds. The default interval is one hour. When the cache is older than this, new logins are performed via keytab.

Next Steps

See Load Data with Pipelines to learn more about how pipelines work.

Enabling Wire Encryption and Kerberos on HDFS Pipelines

In advanced HDFS Pipelines mode, you can encrypt your pipeline’s connection to HDFS and you can authenticate your pipeline using Kerberos. SingleStore supports Hadoop’s Data Transfer Protocol (DTP), which encrypts your pipeline’s connection to HDFS.

This topic assumes you have already have set up your HDFS cluster to use wire encryption and/or Kerberos. For information on how to set up wire encryption, see the DTP section in the Hadoop Secure Mode documentation. For information on how to set up your HDFS cluster to use Kerberos, see the Kerberos discussion in the Hadoop Secure Mode documentation.

To create an advanced HDFS pipeline, first set the advanced_hdfs_pipelinesEngine Variables to true on the master aggregator. Then, run a CREATE PIPELINE statement and pass in JSON attributes in the CONFIG clause. These attributes specify how to encrypt your pipeline’s connection to HDFS, how to authenticate your pipeline using Kerberos, or both.

Note

With advanced HDFS pipelines, you can enable debug logging. To do so, set the engine variable pipelines_extractor_debug_logging engine sync variable to true. This setting allows your pipeline to return error messages to the client application.

Wire Encryption

If encrypted DTP is enabled in your HDFS cluster, you can encrypt your pipeline’s connection to HDFS. To do this, create your CONFIG JSON that you will use in CREATE PIPELINE as follows:

  1. Set dfs.encrypt.data.transfer to true.

  2. Set the attributes dfs.encrypt.data.transfer.cipher.key.bitlength, dfs.encrypt.data.transfer.algorithm, and dfs.data.transfer.protection. Set these attribute’s values as they are specified your hdfs-site.xml file. Find a copy of this file on each node in your HDFS cluster.

The following example creates a pipeline that uses encrypted DTP to communicate with HDFS.

CREATE PIPELINE my_pipeline
AS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files'
CONFIG '{
"dfs.encrypt.data.transfer": true,
"dfs.encrypt.data.transfer.cipher.key.bitlength": 256,
"dfs.encrypt.data.transfer.algorithm": "rc4",
"dfs.data.transfer.protection": "authentication"
}'
INTO TABLE `my_table`
FIELDS TERMINATED BY '\t';

Authenticating with Kerberos

You can create an HDFS pipeline that authenticates with Kerberos. Prior to doing so, perform the following installation steps on every SingleStore leaf node. These steps use EXAMPLE.COM as the default realm and host.example.com as the fully qualified domain name (FQDN) of the KDC server.

Note

Perform the following steps on every SingleStore leaf node (referred to below as the “node”). An exception is step three; perform this step on the KDC server, only.

  1. Install version 1.8 or later of the Java Runtime Environment (JRE). The JRE version installed should match the JRE version installed on the HDFS nodes.

  2. Tell SingleStore the path where the JRE binary files have been installed. An example path is /usr/bin/java/jre1.8.2_12/bin. Specify the path using one of the two following methods:

    Method 1: Add the path to your operating system’s PATH environment variable.

    Method 2: Set the engine variables java_pipelines_java_path and java_pipelines_java_home to the path.

  3. On the KDC server, create a SingleStore service principal (e.g. memsql/host.example.com@EXAMPLE.COM) and a keytab file containing the SingleStore service principal.

  4. Securely copy the keytab file containing the SingleStore service principal from the KDC server to the node. You should use a secure file transfer method, such as scp, to copy the keytab file to your node. The file location on your node should be consistent across all nodes in the cluster.

  5. Ensure that the Linux service account used to run SingleStore on the node can access the copied keytab file. This can be accomplished by changing file ownership or permissions. If this account cannot access the keytab file, you will not be able to complete the next step because your master aggregator will not be able to restart after applying configuration updates.

  6. When authenticating with Kerberos, SingleStore needs to authenticate as a client, which means you must also install a Kerberos client on your node.

    The following command installs the client on Debian-based Linux distributions.

    sudo apt-get update && apt-get install krb5-user

    The following command installs the client on RHEL/CentOS:

    yum install krb5-workstation
  7. Configure your Kerberos client to connect to the KDC server. In your node’s /etc/krb5.conf file, set your default realm, Kerberos admin server, and other options to those defined by your KDC server.

  8. Make sure your node can connect to the KDC server using the fully-qualified domain name (FQDN) of the KDC server. This FQDN is found in the /etc/krb5.conf file. This might require configuring network settings or updating /etc/hosts on your node.

  9. Ensure that your node can access every HDFS datanode, using the FQDN or IP by which the HDFS namenode accesses the datanode. The FQDN is typically used.

  10. Specify the path of your keytab file in the kerberos.keytab attribute of your CONFIG JSON that you will pass to your CREATE PIPELINE statement.

  11. In your CONFIG JSON, add the attributes dfs.datanode.kerberos.principal and dfs.namenode.kerberos.principal. Set these attribute’s values as they are specified your hdfs-site.xml file. Find a copy of this file on each node in your HDFS cluster.

Example CREATE PIPELINE Statement Using Kerberos

The following example demonstrates how to create an HDFS pipeline that authenticates using Kerberos. Assume that port 8020 is the HDFS endpoint.

CREATE PIPELINE my_pipeline
AS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files'
CONFIG '{
"hadoop.security.authentication": "kerberos",
"kerberos.user": "memsql/host.example.com@EXAMPLE.COM",
"kerberos.keytab": "/path/to/kerberos.keytab",
"dfs.client.use.datanode.hostname": true,
"dfs.datanode.kerberos.principal": "datanode_principal/_HOST@EXAMPLE.COM",
"dfs.namenode.kerberos.principal": "namenode_principal/_HOST@EXAMPLE.COM"
}'
INTO TABLE `my_table`
FIELDS TERMINATED BY '\t';

Last modified: September 26, 2023

Was this article helpful?