Load Data from Apache Beam
Warning
SingleStore 9.0 gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 8.9 is recommended for production workloads, which can later be upgraded to SingleStore 9.0.
On this page
The SingleStore I/O connector allows you to access your SingleStore databases from Apache Beam using The SingleStore JDBC Driver.
For information on Apache Beam concepts, see Apache Beam Documentation.
Prerequisites
To use SingleStore I/O connector, add the following Maven artifact dependency to your pom.
file,
<dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-singlestore</artifactId><version>{{< param release_latest >}}</version></dependency>
Configure the Data Source
You need to create a data source configuration to configure the SingleStore I/O connection.DataSourceConfiguration
object with the connection information in the following format:
SingleStoreIO.DataSourceConfiguration.create("<hostname>:<port>").withDatabase("<database_name>").withConnectionProperties("<connection_properties>").withPassword("<password>").withUsername("<username>");
Parameter |
Description |
---|---|
|
Specifies the hostname or IP address of the connection in the |
|
Specifies the database name. |
|
Specifies a list of connection string parameters for the JDBC driver in the |
|
Specifies the username of the SingleStore user. |
|
Specifies the password of the SingleStore user. |
Here's a sample configuration:
SingleStoreIO.DataSourceConfiguration.create("svchost:3306").withDatabase("dbTest").withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE").withPassword("passw0rd").withUsername("s2user");
SingleStore I/O Transforms
SingleStore supports the following pTransforms
:
-
Read
: Reads data sequentially by executing a single query. -
ReadWithPartitions
: Reads data in parallel. -
Write
: Writes data usingLOAD DATA
queries.
All read transforms return bounded PCollections
.PCollections
.
Read from SingleStore
The SingleStore I/O connector supports the following operations to read from tables:
-
.
for sequential data reading, andread() -
.
for reading data in parallel.readWithPartitions()
Sequential Data Reading with . read()
PCollection<USER_DATA_TYPE> items = pipeline.apply(SingleStoreIO.<USER_DATA_TYPE>read().withDataSourceConfiguration(<dataSourceConfiguration>).withTable("<table_name>") // or .withQuery("<query_text>").withStatementPreparator(<statementPreparator>).withOutputParallelization(<value>).withRowMapper(<rowMapper>));
Note
You must specify either the withTable()
or the withQuery()
parameter.
Parameter |
Description |
---|---|
|
Specifies the |
|
Specifies the table to read data. |
|
Specifies the query to execute. |
|
Specifies the
|
|
Specifies the
|
|
Indicates if the resulting |
Here's an example:
SingleStoreIO.read().withDataSourceConfiguration(new SingleStoreIO.DataSourceConfiguration.create("myHost:3306").withUsername("root").withPassword("secret").withDatabase("db")).withQuery("SELECT a*a FROM t").withStatementPreparator(statementPreparator).withOutputParallelism(true).withRowMapper(rowMapper)
Parallel Data Reading with . readWithPartitions()
PCollection<USER_DATA_TYPE> items = pipeline.apply(SingleStoreIO.<USER_DATA_TYPE>readWithPartitions().withDataSourceConfiguration(<dataSourceConfiguration>).withTable("<table_name>") // or .withQuery("<query_text>").withRowMapper(<rowMapper>));
Note
You must specify either the withTable()
or the withQuery()
parameter..
parameter in the DataSourceConfiguration
.
Parameter |
Description |
---|---|
|
Specifies the |
|
Specifies the table to read data. |
|
Specifies the query to execute. |
|
Specifies the
|
Here's an example:
SingleStoreIO.readWithPartitions().withDataSourceConfiguration(new SingleStoreIO.DataSourceConfiguration.create("myHost:3306").withUsername("root").withPassword("secret").withDatabase("db")).withTable("t1").withRowMapper(rowMapper)
StatementPreparator
The StatementPreparator
object implements an interface that is used by .
to set the parameters of the PreparedStatement
.
public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator {@Overridepublic void setParameters(PreparedStatement preparedStatement) throws Exception {preparedStatement.setInt(1, 10);}}
RowMapper
The RowMapper
function is used to convert each row of the ResultSet
into an element of the resulting PCollection
.
public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> {@Overridepublic MyRow mapRow(ResultSet resultSet) throws Exception {return MyRow.create(resultSet.getInt(1), resultSet.getString(2));}}
Write to SingleStore
The .
transformation sends your PCollection
object to your SingleStore databases.
data.apply(SingleStoreIO.<USER_DATA_TYPE>write().withDataSourceConfiguration(<dataSourceConfiguration>).withTable("<table_name>").withUserDataMapper(<userDataMapper>).withBatchSize(<batch_size>));
Parameter |
Description |
---|---|
|
Specifies the |
|
Specifies the table to read data. |
|
Specifies the number of rows loaded by a single |
|
Specifies the
|
Here's an example:
SingleStoreIO.write().withDataSourceConfiguration(new SingleStoreIO.DataSourceConfiguration.create("myHost:3306").withUsername("root").withPassword("secret").withDatabase("db")).withTable("t").withBatchSize(1000).withUserDataMapper(userDataMapper)
UserDataMapper
This function maps data from PCollection
objects to an array of Strings, which is eventually converted into a set of CSV values and loaded into SingleStore.
public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> {@Overridepublic List<String> mapRow(MyRow element) {List<String> res = new ArrayList<>();res.add(element.id().toString());res.add(element.name());return res;}}
Example
The following example shows how to connect to a SingleStore cluster from Apache Beam using the SingleStore I/O connector and perform read/write operations.
Connect to your SingleStore cluster, and run the following commands:
CREATE DATABASE dbExample;USE dbExample;CREATE TABLE readExample (ID INT,Name VARCHAR(32));CREATE TABLE writeExample (ID INT,Name VARCHAR(32));INSERT INTO readExample VALUES(1, 'Adam'),(2, 'Bill'),(3, 'Clark'),(4, 'Dennis'),(5, 'Erik'),(6, 'Finn'),(7, 'Grace'),(8, 'Henry');
Create a Maven project, and add the following dependencies to your pom.
<groupId>com.s2</groupId><artifactId>s2BeamExample</artifactId><version>1.0</version><dependencies><!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>2.44.0</version><scope>runtime</scope></dependency><!-- https://mvnrepository.com/artifact/com.singlestore/singlestore-jdbc-client --><dependency><groupId>com.singlestore</groupId><artifactId>singlestore-jdbc-client</artifactId><version>1.1.4</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-singlestore --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-singlestore</artifactId><version>2.44.0</version></dependency></dependencies>
Read Operation Example
Add the following Java code to the S2ReadExample class.
Once the data is loaded in the PCollection
, it is converted from key/value pairs to strings and written to a .
file.
Pipeline pipeline = Pipeline.create();PCollection < KV < Integer, String >> data = pipeline.apply(SingleStoreIO. < KV < Integer, String >> read().withDataSourceConfiguration(DataSourceConfiguration.create("svchost").withUsername("s2user").withPassword("pa55w0rd").withDatabase("dbExample")).withQuery("SELECT * FROM readExample WHERE ID > ?").withStatementPreparator(new StatementPreparator() {public void setParameters(PreparedStatement preparedStatement) throws Exception {preparedStatement.setInt(1, 4);}}).withRowMapper(new RowMapper < KV < Integer, String >> () {public KV < Integer, String > mapRow(ResultSet resultSet) throws Exception {return KV.of(resultSet.getInt(1), resultSet.getString(2));}}));data.apply(MapElements.into(TypeDescriptors.strings()).via((KV < Integer, String > kv) - > kv.getKey() + "," + kv.getValue())).apply(TextIO.write().to("/path/to/output").withNumShards(1).withSuffix(".csv"));
Run the following commands:
mvn clean compile assembly:singlejava com.s2.beam.S2ReadExample
After the code runs successfully, the resulting CSV file contains the following data:
5,Erik
6,Finn
7,Grace
8,Henry
Write Operation Example
Add the following Java code to the S2WriteExample class.
Pipeline pipeline = Pipeline.create();PCollection < String > lines = pipeline.apply(TextIO.read().from("/path/to/output.csv"));PCollection < KV < Integer, String >> keyValues = lines.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings())).via((String line) - > {String[] fields = line.split(",");return KV.of(Integer.parseInt(fields[0]), fields[1]);}));keyValues.apply(SingleStoreIO. < KV < Integer, String >> write().withDataSourceConfiguration(DataSourceConfiguration.create("svchost ").withUsername("s2user").withPassword("pa55w0rd").withDatabase("dbExample")).withTable("writeExample").withUserDataMapper(new UserDataMapper < KV < Integer, String >> () {public List < String > mapRow(KV < Integer, String > element) {List < String > result = new ArrayList < > ();result.add(element.getKey().toString());result.add(element.getValue());return result;}}));
Run the following commands:
mvn clean compile assembly:singlejava com.s2.beam.S2WriteExample
After the code runs successfully, run the following SQL query:
SELECT * FROM writeExample;
+----+-------+
| ID | Name |
+----+-------+
| 5 | Erik |
| 6 | Finn |
| 7 | Grace |
| 8 | Henry |
+----+-------+
The contents of the CSV file are now inserted into the writeExample table.
Last modified: October 25, 2023