Skip to main content

Load Data from Apache Beam

The SingleStoreDB I/O connector allows you to access your SingleStoreDB databases from Apache Beam using The SingleStore JDBC Driver. You can also export your SingleStoreDB databases to Apache Beam using this connector.

For information on Apache Beam concepts, see Apache Beam Documentation.

Prerequisites

To use SingleStoreDB I/O connector, add the following Maven artifact dependency to your pom.xml file,

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-singlestore</artifactId>
    <version>{{< param release_latest >}}</version>
</dependency>

Configure the Data Source

You need to create a data source configuration to configure the SingleStoreDB I/O connection. Define a DataSourceConfiguration object with the connection information in the following format:

SingleStoreIO.DataSourceConfiguration
    .create("<hostname>:<port>")
    .withDatabase("<database_name>")
    .withConnectionProperties("<connection_properties>")
    .withPassword("<password>")
    .withUsername("<username>");

Parameter

Description

.create() (Required)

Specifies the hostname or IP address of the connection in the <host:port> format. Default port is 3306.

.withDatabase()

Specifies the database name. If this parameter is set, the connection uses the specified database.

.withConnectionProperties()

Specifies a list of connection string parameters for the JDBC driver in the "<parameter>=<value>[;<parameter>=<value>...]" format. See The SingleStore JDBC Driver for more information.

.withUsername()

Specifies the username of the SingleStoreDB user.

.withPassword()

Specifies the password of the SingleStoreDB user.

Here's a sample configuration:

SingleStoreIO.DataSourceConfiguration
    .create("svchost:3306")
    .withDatabase("dbTest")
    .withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE")
    .withPassword("passw0rd")
    .withUsername("s2user");

SingleStoreDB I/O Transforms

SingleStoreDB supports the following pTransforms:

  • Read: Reads data sequentially by executing a single query.

  • ReadWithPartitions: Reads data in parallel.

  • Write: Writes data using LOAD DATA queries.

All read transforms return bounded PCollections. Write transforms can write both bounded and unbounded PCollections.

Read from SingleStoreDB

The SingleStoreDB I/O connector supports the following operations to read from tables:

  • .read() for sequential data reading, and

  • .readWithPartitions() for reading data in parallel.

Sequential Data Reading with .read()

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>read()
        .withDataSourceConfiguration(<dataSourceConfiguration>)
        .withTable("<table_name>") // or .withQuery("<query_text>")
        .withStatementPreparator(<statementPreparator>)
        .withOutputParallelization(<value>)
        .withRowMapper(<rowMapper>)
);

Note

You must specify either the withTable() or the withQuery() parameter.

Parameter

Description

.withDataSourceConfiguration(<dataSourceConfiguration>) (Required)

Specifies the DataSourceConfiguration object. See Configure the Data Source for more information.

.withTable("<table_name>")

Specifies the table to read data.

.withQuery("<query_text>")

Specifies the query to execute.

.withStatementPreparator(<statementPreparator>)

Specifies the StatementPreparator object.

.withRowMapper(<rowMapper>) (Required)

Specifies the RowMapper object.

.withOutputParallelization(<value>)

Indicates if the resulting PCollection set is reshuffled. By default, it is set to true.

Here's an example:

SingleStoreIO.read()
.withDataSourceConfiguration(
	new SingleStoreIO.DataSourceConfiguration.create("myHost:3306")
                   .withUsername("root")
                   .withPassword("secret")
                   .withDatabase("db")
)
.withQuery("SELECT a*a FROM t")
.withStatementPreparator(statementPreparator)
.withOutputParallelism(true)
.withRowMapper(rowMapper)

Parallel Data Reading with .readWithPartitions()

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>readWithPartitions()
        .withDataSourceConfiguration(<dataSourceConfiguration>)
        .withTable("<table_name>") // or .withQuery("<query_text>")
        .withRowMapper(<rowMapper>)
);

Note

You must specify either the withTable() or the withQuery() parameter. Additionally, you must specify the .withDatabase() parameter in the DataSourceConfiguration.

Parameter

Description

.withDataSourceConfiguration(<dataSourceConfiguration>) (Required)

Specifies the DataSourceConfiguration object. See Configure the DataSource for more information.

.withTable("<table_name>")

Specifies the table to read data.

.withQuery("<query_text>")

Specifies the query to execute.

.withRowMapper(<rowMapper>) (Required)

Specifies the RowMapper function.

Here's an example:

SingleStoreIO.readWithPartitions()
.withDataSourceConfiguration(
	new SingleStoreIO.DataSourceConfiguration.create("myHost:3306")
                   .withUsername("root")
                   .withPassword("secret")
                   .withDatabase("db")
)
.withTable("t1")
.withRowMapper(rowMapper)

StatementPreparator

The StatementPreparator object implements an interface that is used by .read() to set the parameters of the PreparedStatement. Here's an example:

public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator {
    @Override
    public void setParameters(PreparedStatement preparedStatement) throws Exception {
        preparedStatement.setInt(1, 10);
    }
}

RowMapper

The RowMapper function is used to convert each row of the ResultSet into an element of the resulting PCollection. Here's an example:

public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> {
    @Override
    public MyRow mapRow(ResultSet resultSet) throws Exception {
        return MyRow.create(resultSet.getInt(1), resultSet.getString(2));
    }
}

Write to SingleStoreDB

The .write() transformation sends your PCollection object to your SingleStoreDB databases. It returns the total number of rows written by each batch of elements. Use the following format:

data.apply(
    SingleStoreIO.<USER_DATA_TYPE>write()
        .withDataSourceConfiguration(<dataSourceConfiguration>)
        .withTable("<table_name>")
        .withUserDataMapper(<userDataMapper>)
        .withBatchSize(<batch_size>)
);

Parameter

Description

.withDataSourceConfiguration(<dataSourceConfiguration>) (Required)

Specifies the DataSourceConfiguration object. See Configure the DataSource for more information.

.withTable("<table_name>") (Required)

Specifies the table to read data.

.withBatchSize(<batch_size>)

Specifies the number of rows loaded by a single LOAD DATA query. The default value is 100000.

.withUserDataMapper(<userDataMapper>) (Required)

Specifies the UserDataMapper function.

Here's an example:

SingleStoreIO.write()
.withDataSourceConfiguration(
	new SingleStoreIO.DataSourceConfiguration.create("myHost:3306")
                   .withUsername("root")
                   .withPassword("secret")
                   .withDatabase("db")
)
.withTable("t")
.withBatchSize(1000)
.withUserDataMapper(userDataMapper)

UserDataMapper

This function maps data from PCollection objects to an array of Strings, which is eventually converted into a set of CSV values and loaded into SingleStoreDB. Here's an example:

public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> {
    @Override
    public List<String> mapRow(MyRow element) {
        List<String> res = new ArrayList<>();
        res.add(element.id().toString());
        res.add(element.name());
        return res;
    }
}