Load Data from Apache Beam

The SingleStore I/O connector allows you to access your SingleStore databases from Apache Beam using The SingleStore JDBC Driver. You can also export your SingleStore databases to Apache Beam using this connector.

For information on Apache Beam concepts, see Apache Beam Documentation.

Prerequisites

To use SingleStore I/O connector, add the following Maven artifact dependency to your pom.xml file,

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-singlestore</artifactId>
<version>{{< param release_latest >}}</version>
</dependency>

Configure the Data Source

You need to create a data source configuration to configure the SingleStore I/O connection. Define a DataSourceConfiguration object with the connection information in the following format:

SingleStoreIO.DataSourceConfiguration
.create("<hostname>:<port>")
.withDatabase("<database_name>")
.withConnectionProperties("<connection_properties>")
.withPassword("<password>")
.withUsername("<username>");

Parameter

Description

.create() (Required)

Specifies the hostname or IP address of the connection in the <host:port> format. Default port is 3306.

.withDatabase()

Specifies the database name. If this parameter is set, the connection uses the specified database.

.withConnectionProperties()

Specifies a list of connection string parameters for the JDBC driver in the "<parameter>=<value>[;<parameter>=<value>...]" format.

.withUsername()

Specifies the username of the SingleStore user.

.withPassword()

Specifies the password of the SingleStore user.

Here's a sample configuration:

SingleStoreIO.DataSourceConfiguration
.create("svchost:3306")
.withDatabase("dbTest")
.withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE")
.withPassword("passw0rd")
.withUsername("s2user");

SingleStore I/O Transforms

SingleStore supports the following pTransforms:

  • Read: Reads data sequentially by executing a single query.

  • ReadWithPartitions: Reads data in parallel.

  • Write: Writes data using LOAD DATA queries.

All read transforms return bounded PCollections. Write transforms can write both bounded and unbounded PCollections.

Read from SingleStore

The SingleStore I/O connector supports the following operations to read from tables:

  • .read() for sequential data reading, and

  • .readWithPartitions() for reading data in parallel.

Sequential Data Reading with .read()

PCollection<USER_DATA_TYPE> items = pipeline.apply(
SingleStoreIO.<USER_DATA_TYPE>read()
.withDataSourceConfiguration(<dataSourceConfiguration>)
.withTable("<table_name>") // or .withQuery("<query_text>")
.withStatementPreparator(<statementPreparator>)
.withOutputParallelization(<value>)
.withRowMapper(<rowMapper>)
);

Note

You must specify either the withTable() or the withQuery() parameter.

Parameter

Description

.withDataSourceConfiguration(<dataSourceConfiguration>) (Required)

Specifies the DataSourceConfiguration object. See Configure the Data Source for more information.

.withTable("<table_name>")

Specifies the table to read data.

.withQuery("<query_text>")

Specifies the query to execute.

.withStatementPreparator(<statementPreparator>)

Specifies the StatementPreparator object.

.withRowMapper(<rowMapper>) (Required)

Specifies the RowMapper object.

.withOutputParallelization(<value>)

Indicates if the resulting PCollection set is reshuffled. By default, it is set to true.

Here's an example:

SingleStoreIO.read()
.withDataSourceConfiguration(
new SingleStoreIO.DataSourceConfiguration.create("myHost:3306")
.withUsername("root")
.withPassword("secret")
.withDatabase("db")
)
.withQuery("SELECT a*a FROM t")
.withStatementPreparator(statementPreparator)
.withOutputParallelism(true)
.withRowMapper(rowMapper)

Parallel Data Reading with .readWithPartitions()

PCollection<USER_DATA_TYPE> items = pipeline.apply(
SingleStoreIO.<USER_DATA_TYPE>readWithPartitions()
.withDataSourceConfiguration(<dataSourceConfiguration>)
.withTable("<table_name>") // or .withQuery("<query_text>")
.withRowMapper(<rowMapper>)
);

Note

You must specify either the withTable() or the withQuery() parameter. Additionally, you must specify the .withDatabase() parameter in the DataSourceConfiguration.

Parameter

Description

.withDataSourceConfiguration(<dataSourceConfiguration>) (Required)

Specifies the DataSourceConfiguration object. See Configure the DataSource for more information.

.withTable("<table_name>")

Specifies the table to read data.

.withQuery("<query_text>")

Specifies the query to execute.

.withRowMapper(<rowMapper>) (Required)

Specifies the RowMapper function.

Here's an example:

SingleStoreIO.readWithPartitions()
.withDataSourceConfiguration(
new SingleStoreIO.DataSourceConfiguration.create("myHost:3306")
.withUsername("root")
.withPassword("secret")
.withDatabase("db")
)
.withTable("t1")
.withRowMapper(rowMapper)

StatementPreparator

The StatementPreparator object implements an interface that is used by .read() to set the parameters of the PreparedStatement. Here's an example:

public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator {
@Override
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setInt(1, 10);
}
}

RowMapper

The RowMapper function is used to convert each row of the ResultSet into an element of the resulting PCollection. Here's an example:

public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> {
@Override
public MyRow mapRow(ResultSet resultSet) throws Exception {
return MyRow.create(resultSet.getInt(1), resultSet.getString(2));
}
}

Write to SingleStore

The .write() transformation sends your PCollection object to your SingleStore databases. It returns the total number of rows written by each batch of elements. Use the following format:

data.apply(
SingleStoreIO.<USER_DATA_TYPE>write()
.withDataSourceConfiguration(<dataSourceConfiguration>)
.withTable("<table_name>")
.withUserDataMapper(<userDataMapper>)
.withBatchSize(<batch_size>)
);

Parameter

Description

.withDataSourceConfiguration(<dataSourceConfiguration>) (Required)

Specifies the DataSourceConfiguration object. See Configure the DataSource for more information.

.withTable("<table_name>") (Required)

Specifies the table to read data.

.withBatchSize(<batch_size>)

Specifies the number of rows loaded by a single LOAD DATA query. The default value is 100000.

.withUserDataMapper(<userDataMapper>) (Required)

Specifies the UserDataMapper function.

Here's an example:

SingleStoreIO.write()
.withDataSourceConfiguration(
new SingleStoreIO.DataSourceConfiguration.create("myHost:3306")
.withUsername("root")
.withPassword("secret")
.withDatabase("db")
)
.withTable("t")
.withBatchSize(1000)
.withUserDataMapper(userDataMapper)

UserDataMapper

This function maps data from PCollection objects to an array of Strings, which is eventually converted into a set of CSV values and loaded into SingleStore. Here's an example:

public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> {
@Override
public List<String> mapRow(MyRow element) {
List<String> res = new ArrayList<>();
res.add(element.id().toString());
res.add(element.name());
return res;
}
}

Example

The following example shows how to connect to a SingleStore Helios workspace from Apache Beam using the SingleStore I/O connector and perform read/write operations.

Connect to your SingleStore Helios workspace, and run the following commands:

CREATE DATABASE dbExample;
USE dbExample;
CREATE TABLE readExample (
ID INT,
Name VARCHAR(32)
);
CREATE TABLE writeExample (
ID INT,
Name VARCHAR(32)
);
INSERT INTO readExample VALUES
(1, 'Adam'),
(2, 'Bill'),
(3, 'Clark'),
(4, 'Dennis'),
(5, 'Erik'),
(6, 'Finn'),
(7, 'Grace'),
(8, 'Henry');

Create a Maven project, and add the following dependencies to your pom.xml file:

<groupId>com.s2</groupId>
<artifactId>s2BeamExample</artifactId>
<version>1.0</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.44.0</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.singlestore/singlestore-jdbc-client -->
<dependency>
<groupId>com.singlestore</groupId>
<artifactId>singlestore-jdbc-client</artifactId>
<version>1.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-singlestore -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-singlestore</artifactId>
<version>2.44.0</version>
</dependency>
</dependencies>

Read Operation Example

Add the following Java code to the S2ReadExample class. It defines the data source configuration for the SingleStore I/O connection and reads the values from the readExample table as key/value pairs.

Once the data is loaded in the PCollection, it is converted from key/value pairs to strings and written to a .csv file.

Pipeline pipeline = Pipeline.create();
PCollection < KV < Integer, String >> data = pipeline.apply(SingleStoreIO. < KV < Integer, String >> read()
.withDataSourceConfiguration(DataSourceConfiguration
.create("svchost")
.withUsername("s2user")
.withPassword("pa55w0rd")
.withDatabase("dbExample"))
.withQuery("SELECT * FROM readExample WHERE ID > ?")
.withStatementPreparator(new StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setInt(1, 4);
}
})
.withRowMapper(new RowMapper < KV < Integer, String >> () {
public KV < Integer, String > mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
data
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV < Integer, String > kv) - > kv.getKey() + "," + kv.getValue()))
.apply(TextIO
.write().to("/path/to/output").withNumShards(1).withSuffix(".csv")
);

Run the following commands:

mvn clean compile assembly:single
java com.s2.beam.S2ReadExample

After the code runs successfully, the resulting CSV file contains the following data:

5,Erik
6,Finn
7,Grace
8,Henry

Write Operation Example

Add the following Java code to the S2WriteExample class. It defines the data source configuration for the SingleStore I/O connection, reads the data from the CSV file (created in the example above), and writes it into the writeExample table.

Pipeline pipeline = Pipeline.create();
PCollection < String > lines = pipeline.apply(
TextIO.read().from("/path/to/output.csv"));
PCollection < KV < Integer, String >> keyValues = lines.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via((String line) - > {
String[] fields = line.split(",");
return KV.of(Integer.parseInt(fields[0]), fields[1]);
})
);
keyValues.apply(SingleStoreIO. < KV < Integer, String >> write()
.withDataSourceConfiguration(DataSourceConfiguration
.create("svchost ")
.withUsername("s2user")
.withPassword("pa55w0rd")
.withDatabase("dbExample"))
.withTable("writeExample")
.withUserDataMapper(new UserDataMapper < KV < Integer, String >> () {
public List < String > mapRow(KV < Integer, String > element) {
List < String > result = new ArrayList < > ();
result.add(element.getKey().toString());
result.add(element.getValue());
return result;
}
})
);

Run the following commands:

mvn clean compile assembly:single
java com.s2.beam.S2WriteExample

After the code runs successfully, run the following SQL query:

SELECT * FROM writeExample;
+----+-------+
| ID | Name  |
+----+-------+
|  5 | Erik  |
|  6 | Finn  |
|  7 | Grace |
|  8 | Henry |
+----+-------+

The contents of the CSV file are now inserted into the writeExample table.

Last modified: October 25, 2023

Was this article helpful?