Load Data from Apache Beam
On this page
The SingleStore I/O connector allows you to access your SingleStore databases from Apache Beam using The SingleStore JDBC Driver.
For information on Apache Beam concepts, see Apache Beam Documentation.
Prerequisites
To use SingleStore I/O connector, add the following Maven artifact dependency to your pom.
file,
<dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-singlestore</artifactId><version>{{< param release_latest >}}</version></dependency>
Configure the Data Source
You need to create a data source configuration to configure the SingleStore I/O connection.DataSourceConfiguration
object with the connection information in the following format:
SingleStoreIO.DataSourceConfiguration.create("<hostname>:<port>").withDatabase("<database_name>").withConnectionProperties("<connection_properties>").withPassword("<password>").withUsername("<username>");
Parameter |
Description |
---|---|
|
Specifies the hostname or IP address of the connection in the |
|
Specifies the database name. |
|
Specifies a list of connection string parameters for the JDBC driver in the |
|
Specifies the username of the SingleStore user. |
|
Specifies the password of the SingleStore user. |
Here's a sample configuration:
SingleStoreIO.DataSourceConfiguration.create("svchost:3306").withDatabase("dbTest").withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE").withPassword("passw0rd").withUsername("s2user");
SingleStore I/O Transforms
SingleStore supports the following pTransforms
:
-
Read
: Reads data sequentially by executing a single query. -
ReadWithPartitions
: Reads data in parallel. -
Write
: Writes data usingLOAD DATA
queries.
All read transforms return bounded PCollections
.PCollections
.
Read from SingleStore
The SingleStore I/O connector supports the following operations to read from tables:
-
.
for sequential data reading, andread() -
.
for reading data in parallel.readWithPartitions()
Sequential Data Reading with . read()
PCollection<USER_DATA_TYPE> items = pipeline.apply(SingleStoreIO.<USER_DATA_TYPE>read().withDataSourceConfiguration(<dataSourceConfiguration>).withTable("<table_name>") // or .withQuery("<query_text>").withStatementPreparator(<statementPreparator>).withOutputParallelization(<value>).withRowMapper(<rowMapper>));
Note
You must specify either the withTable()
or the withQuery()
parameter.
Parameter |
Description |
---|---|
|
Specifies the |
|
Specifies the table to read data. |
|
Specifies the query to execute. |
|
Specifies the
|
|
Specifies the
|
|
Indicates if the resulting |
Here's an example:
SingleStoreIO.read().withDataSourceConfiguration(new SingleStoreIO.DataSourceConfiguration.create("myHost:3306").withUsername("root").withPassword("secret").withDatabase("db")).withQuery("SELECT a*a FROM t").withStatementPreparator(statementPreparator).withOutputParallelism(true).withRowMapper(rowMapper)
Parallel Data Reading with . readWithPartitions()
PCollection<USER_DATA_TYPE> items = pipeline.apply(SingleStoreIO.<USER_DATA_TYPE>readWithPartitions().withDataSourceConfiguration(<dataSourceConfiguration>).withTable("<table_name>") // or .withQuery("<query_text>").withRowMapper(<rowMapper>));
Note
You must specify either the withTable()
or the withQuery()
parameter..
parameter in the DataSourceConfiguration
.
Parameter |
Description |
---|---|
|
Specifies the |
|
Specifies the table to read data. |
|
Specifies the query to execute. |
|
Specifies the
|
Here's an example:
SingleStoreIO.readWithPartitions().withDataSourceConfiguration(new SingleStoreIO.DataSourceConfiguration.create("myHost:3306").withUsername("root").withPassword("secret").withDatabase("db")).withTable("t1").withRowMapper(rowMapper)
StatementPreparator
The StatementPreparator
object implements an interface that is used by .
to set the parameters of the PreparedStatement
.
public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator {@Overridepublic void setParameters(PreparedStatement preparedStatement) throws Exception {preparedStatement.setInt(1, 10);}}
RowMapper
The RowMapper
function is used to convert each row of the ResultSet
into an element of the resulting PCollection
.
public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> {@Overridepublic MyRow mapRow(ResultSet resultSet) throws Exception {return MyRow.create(resultSet.getInt(1), resultSet.getString(2));}}
Write to SingleStore
The .
transformation sends your PCollection
object to your SingleStore databases.
data.apply(SingleStoreIO.<USER_DATA_TYPE>write().withDataSourceConfiguration(<dataSourceConfiguration>).withTable("<table_name>").withUserDataMapper(<userDataMapper>).withBatchSize(<batch_size>));
Parameter |
Description |
---|---|
|
Specifies the |
|
Specifies the table to read data. |
|
Specifies the number of rows loaded by a single |
|
Specifies the
|
Here's an example:
SingleStoreIO.write().withDataSourceConfiguration(new SingleStoreIO.DataSourceConfiguration.create("myHost:3306").withUsername("root").withPassword("secret").withDatabase("db")).withTable("t").withBatchSize(1000).withUserDataMapper(userDataMapper)
UserDataMapper
This function maps data from PCollection
objects to an array of Strings, which is eventually converted into a set of CSV values and loaded into SingleStore.
public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> {@Overridepublic List<String> mapRow(MyRow element) {List<String> res = new ArrayList<>();res.add(element.id().toString());res.add(element.name());return res;}}
Example
The following example shows how to connect to a SingleStore cluster from Apache Beam using the SingleStore I/O connector and perform read/write operations.
Connect to your SingleStore cluster, and run the following commands:
CREATE DATABASE dbExample;USE dbExample;CREATE TABLE readExample (ID INT,Name VARCHAR(32));CREATE TABLE writeExample (ID INT,Name VARCHAR(32));INSERT INTO readExample VALUES(1, 'Adam'),(2, 'Bill'),(3, 'Clark'),(4, 'Dennis'),(5, 'Erik'),(6, 'Finn'),(7, 'Grace'),(8, 'Henry');
Create a Maven project, and add the following dependencies to your pom.
<groupId>com.s2</groupId><artifactId>s2BeamExample</artifactId><version>1.0</version><dependencies><!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>2.44.0</version><scope>runtime</scope></dependency><!-- https://mvnrepository.com/artifact/com.singlestore/singlestore-jdbc-client --><dependency><groupId>com.singlestore</groupId><artifactId>singlestore-jdbc-client</artifactId><version>1.1.4</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-singlestore --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-singlestore</artifactId><version>2.44.0</version></dependency></dependencies>
Read Operation Example
Add the following Java code to the S2ReadExample class.
Once the data is loaded in the PCollection
, it is converted from key/value pairs to strings and written to a .
file.
Pipeline pipeline = Pipeline.create();PCollection < KV < Integer, String >> data = pipeline.apply(SingleStoreIO. < KV < Integer, String >> read().withDataSourceConfiguration(DataSourceConfiguration.create("svchost").withUsername("s2user").withPassword("pa55w0rd").withDatabase("dbExample")).withQuery("SELECT * FROM readExample WHERE ID > ?").withStatementPreparator(new StatementPreparator() {public void setParameters(PreparedStatement preparedStatement) throws Exception {preparedStatement.setInt(1, 4);}}).withRowMapper(new RowMapper < KV < Integer, String >> () {public KV < Integer, String > mapRow(ResultSet resultSet) throws Exception {return KV.of(resultSet.getInt(1), resultSet.getString(2));}}));data.apply(MapElements.into(TypeDescriptors.strings()).via((KV < Integer, String > kv) - > kv.getKey() + "," + kv.getValue())).apply(TextIO.write().to("/path/to/output").withNumShards(1).withSuffix(".csv"));
Run the following commands:
mvn clean compile assembly:singlejava com.s2.beam.S2ReadExample
After the code runs successfully, the resulting CSV file contains the following data:
5,Erik
6,Finn
7,Grace
8,Henry
Write Operation Example
Add the following Java code to the S2WriteExample class.
Pipeline pipeline = Pipeline.create();PCollection < String > lines = pipeline.apply(TextIO.read().from("/path/to/output.csv"));PCollection < KV < Integer, String >> keyValues = lines.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings())).via((String line) - > {String[] fields = line.split(",");return KV.of(Integer.parseInt(fields[0]), fields[1]);}));keyValues.apply(SingleStoreIO. < KV < Integer, String >> write().withDataSourceConfiguration(DataSourceConfiguration.create("svchost ").withUsername("s2user").withPassword("pa55w0rd").withDatabase("dbExample")).withTable("writeExample").withUserDataMapper(new UserDataMapper < KV < Integer, String >> () {public List < String > mapRow(KV < Integer, String > element) {List < String > result = new ArrayList < > ();result.add(element.getKey().toString());result.add(element.getValue());return result;}}));
Run the following commands:
mvn clean compile assembly:singlejava com.s2.beam.S2WriteExample
After the code runs successfully, run the following SQL query:
SELECT * FROM writeExample;
+----+-------+
| ID | Name |
+----+-------+
| 5 | Erik |
| 6 | Finn |
| 7 | Grace |
| 8 | Henry |
+----+-------+
The contents of the CSV file are now inserted into the writeExample table.
Last modified: October 25, 2023