Load Data from Apache Beam
The SingleStoreDB I/O connector allows you to access your SingleStoreDB databases from Apache Beam using The SingleStore JDBC Driver. You can also export your SingleStoreDB databases to Apache Beam using this connector.
For information on Apache Beam concepts, see Apache Beam Documentation.
Prerequisites
To use SingleStoreDB I/O connector, add the following Maven artifact dependency to your pom.xml
file,
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-singlestore</artifactId> <version>{{< param release_latest >}}</version> </dependency>
Configure the Data Source
You need to create a data source configuration to configure the SingleStoreDB I/O connection. Define a DataSourceConfiguration
object with the connection information in the following format:
SingleStoreIO.DataSourceConfiguration .create("<hostname>:<port>") .withDatabase("<database_name>") .withConnectionProperties("<connection_properties>") .withPassword("<password>") .withUsername("<username>");
Parameter | Description |
---|---|
| Specifies the hostname or IP address of the connection in the |
| Specifies the database name. If this parameter is set, the connection uses the specified database. |
| Specifies a list of connection string parameters for the JDBC driver in the |
| Specifies the username of the SingleStoreDB user. |
| Specifies the password of the SingleStoreDB user. |
Here's a sample configuration:
SingleStoreIO.DataSourceConfiguration .create("svchost:3306") .withDatabase("dbTest") .withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE") .withPassword("passw0rd") .withUsername("s2user");
SingleStoreDB I/O Transforms
SingleStoreDB supports the following pTransforms
:
Read
: Reads data sequentially by executing a single query.ReadWithPartitions
: Reads data in parallel.Write
: Writes data usingLOAD DATA
queries.
All read transforms return bounded PCollections
. Write transforms can write both bounded and unbounded PCollections
.
Read from SingleStoreDB
The SingleStoreDB I/O connector supports the following operations to read from tables:
.read()
for sequential data reading, and.readWithPartitions()
for reading data in parallel.
Sequential Data Reading with .read()
PCollection<USER_DATA_TYPE> items = pipeline.apply( SingleStoreIO.<USER_DATA_TYPE>read() .withDataSourceConfiguration(<dataSourceConfiguration>) .withTable("<table_name>") // or .withQuery("<query_text>") .withStatementPreparator(<statementPreparator>) .withOutputParallelization(<value>) .withRowMapper(<rowMapper>) );
Note
You must specify either the withTable()
or the withQuery()
parameter.
Parameter | Description |
---|---|
| Specifies the |
| Specifies the table to read data. |
| Specifies the query to execute. |
| Specifies the |
| Specifies the |
| Indicates if the resulting |
Here's an example:
SingleStoreIO.read() .withDataSourceConfiguration( new SingleStoreIO.DataSourceConfiguration.create("myHost:3306") .withUsername("root") .withPassword("secret") .withDatabase("db") ) .withQuery("SELECT a*a FROM t") .withStatementPreparator(statementPreparator) .withOutputParallelism(true) .withRowMapper(rowMapper)
Parallel Data Reading with .readWithPartitions()
PCollection<USER_DATA_TYPE> items = pipeline.apply( SingleStoreIO.<USER_DATA_TYPE>readWithPartitions() .withDataSourceConfiguration(<dataSourceConfiguration>) .withTable("<table_name>") // or .withQuery("<query_text>") .withRowMapper(<rowMapper>) );
Note
You must specify either the withTable()
or the withQuery()
parameter. Additionally, you must specify the .withDatabase()
parameter in the DataSourceConfiguration
.
Parameter | Description |
---|---|
| Specifies the |
| Specifies the table to read data. |
| Specifies the query to execute. |
| Specifies the |
Here's an example:
SingleStoreIO.readWithPartitions() .withDataSourceConfiguration( new SingleStoreIO.DataSourceConfiguration.create("myHost:3306") .withUsername("root") .withPassword("secret") .withDatabase("db") ) .withTable("t1") .withRowMapper(rowMapper)
StatementPreparator
The StatementPreparator
object implements an interface that is used by .read()
to set the parameters of the PreparedStatement
. Here's an example:
public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator { @Override public void setParameters(PreparedStatement preparedStatement) throws Exception { preparedStatement.setInt(1, 10); } }
RowMapper
The RowMapper
function is used to convert each row of the ResultSet
into an element of the resulting PCollection
. Here's an example:
public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> { @Override public MyRow mapRow(ResultSet resultSet) throws Exception { return MyRow.create(resultSet.getInt(1), resultSet.getString(2)); } }
Write to SingleStoreDB
The .write()
transformation sends your PCollection
object to your SingleStoreDB databases. It returns the total number of rows written by each batch of elements. Use the following format:
data.apply( SingleStoreIO.<USER_DATA_TYPE>write() .withDataSourceConfiguration(<dataSourceConfiguration>) .withTable("<table_name>") .withUserDataMapper(<userDataMapper>) .withBatchSize(<batch_size>) );
Parameter | Description |
---|---|
| Specifies the |
| Specifies the table to read data. |
| Specifies the number of rows loaded by a single |
| Specifies the |
Here's an example:
SingleStoreIO.write() .withDataSourceConfiguration( new SingleStoreIO.DataSourceConfiguration.create("myHost:3306") .withUsername("root") .withPassword("secret") .withDatabase("db") ) .withTable("t") .withBatchSize(1000) .withUserDataMapper(userDataMapper)
UserDataMapper
This function maps data from PCollection
objects to an array of Strings, which is eventually converted into a set of CSV values and loaded into SingleStoreDB. Here's an example:
public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> { @Override public List<String> mapRow(MyRow element) { List<String> res = new ArrayList<>(); res.add(element.id().toString()); res.add(element.name()); return res; } }