# Load Data from Apache Beam

The SingleStore I/O connector allows you to access your SingleStore databases from Apache Beam using [The SingleStore JDBC Driver](https://docs.singlestore.com/db/v9.1/developer-resources/connect-with-application-development-tools/connect-with-java-jdbc/the-singlestore-jdbc-driver.md). You can also export your SingleStore databases to Apache Beam using this connector.

For information on Apache Beam concepts, see [Apache Beam Documentation](https://beam.apache.org/documentation/).

## Prerequisites

To use SingleStore I/O connector, add the following Maven artifact dependency to your `pom.xml` file,

```xml
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-singlestore</artifactId>
    <version>{{< param release_latest >}}</version>
</dependency>
```

## Configure the Data Source

You need to create a data source configuration to configure the SingleStore I/O connection. Define a `DataSourceConfiguration` object with the connection information in the following format:

```java
SingleStoreIO.DataSourceConfiguration
    .create("<hostname>:<port>")
    .withDatabase("<database_name>")
    .withConnectionProperties("<connection_properties>")
    .withPassword("<password>")
    .withUsername("<username>");
```

| Parameter                     | Description                                                                                                                                                                                                                                                                                                                            |
| ----------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `.create()`(Required)         | Specifies the hostname or IP address of the connection in the`<host:port>`format. Default port is`3306`.                                                                                                                                                                                                                               |
| `.withDatabase()`             | Specifies the database name. If this parameter is set, the connection uses the specified database.                                                                                                                                                                                                                                     |
| `.withConnectionProperties()` | Specifies a list of[connection string parameters](https://docs.singlestore.com/db/v9.1/developer-resources/connect-with-application-development-tools/connect-with-java-jdbc/the-singlestore-jdbc-driver/#section-idm4492642533297632740896869469.md)for the JDBC driver in the`"<parameter>=<value>[;<parameter>=<value>...]"`format. |
| `.withUsername()`             | Specifies the username of theSingleStoreuser.                                                                                                                                                                                                                                                                                          |
| `.withPassword()`             | Specifies the password of theSingleStoreuser.                                                                                                                                                                                                                                                                                          |

Here's a sample configuration:

```java
SingleStoreIO.DataSourceConfiguration
    .create("svchost:3306")
    .withDatabase("dbTest")
    .withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE")
    .withPassword("passw0rd")
    .withUsername("s2user");
```

## SingleStore I/O Transforms

SingleStore supports the following `pTransforms`:

* `Read`: Reads data sequentially by executing a single query.
* `ReadWithPartitions`: Reads data in parallel.
* `Write`: Writes data using `LOAD DATA` queries.

All read transforms return bounded `PCollections`. Write transforms can write both bounded and unbounded `PCollections`.

## Read from SingleStore

The SingleStore I/O connector supports the following operations to read from tables:

* `.read()` for sequential data reading, and
* `.readWithPartitions()` for reading data in parallel.

## Sequential Data Reading with `.read()`

```java
PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>read()
        .withDataSourceConfiguration(<dataSourceConfiguration>)
        .withTable("<table_name>") // or .withQuery("<query_text>")
        .withStatementPreparator(<statementPreparator>)
        .withOutputParallelization(<value>)
        .withRowMapper(<rowMapper>)
);
```

> **📝 Note**: You must specify either the `withTable()` or the `withQuery()` parameter.

| Parameter                                                           | Description                                                                                                                                                                 |
| ------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `.withDataSourceConfiguration(<dataSourceConfiguration>)`(Required) | Specifies the`DataSourceConfiguration`object. See[Configure the Data Source](https://docs.singlestore.com/#section-idm4522194824188833503606758322.md)for more information. |
| `.withTable("<table_name>")`                                        | Specifies the table to read data.                                                                                                                                           |
| `.withQuery("<query_text>")`                                        | Specifies the query to execute.                                                                                                                                             |
| `.withStatementPreparator(<statementPreparator>)`                   | Specifies the[`StatementPreparator`](https://docs.singlestore.com/#section-idm456376309617923350368853301.md)object.                                                        |
| `.withRowMapper(<rowMapper>)`(Required)                             | Specifies the[`RowMapper`](https://docs.singlestore.com/#section-idm4625920250897633503690110446.md)object.                                                                 |
| `.withOutputParallelization(<value>)`                               | Indicates if the resulting`PCollection`set is reshuffled. By default, it is set to`true`.                                                                                   |

Here's an example:

```java
SingleStoreIO.read()
.withDataSourceConfiguration(
	new SingleStoreIO.DataSourceConfiguration.create("myHost:3306")
                   .withUsername("root")
                   .withPassword("secret")
                   .withDatabase("db")
)
.withQuery("SELECT a*a FROM t")
.withStatementPreparator(statementPreparator)
.withOutputParallelism(true)
.withRowMapper(rowMapper)
```

## Parallel Data Reading with `.readWithPartitions()`

```java
PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>readWithPartitions()
        .withDataSourceConfiguration(<dataSourceConfiguration>)
        .withTable("<table_name>") // or .withQuery("<query_text>")
        .withRowMapper(<rowMapper>)
);
```

> **📝 Note**: You must specify either the `withTable()` or the `withQuery()` parameter. Additionally, you must specify the `.withDatabase()` parameter in the `DataSourceConfiguration`.

| Parameter                                                           | Description                                                                                                                                                                |
| ------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `.withDataSourceConfiguration(<dataSourceConfiguration>)`(Required) | Specifies the`DataSourceConfiguration`object. See[Configure the DataSource](https://docs.singlestore.com/#section-idm4522194824188833503606758322.md)for more information. |
| `.withTable("<table_name>")`                                        | Specifies the table to read data.                                                                                                                                          |
| `.withQuery("<query_text>")`                                        | Specifies the query to execute.                                                                                                                                            |
| `.withRowMapper(<rowMapper>)`(Required)                             | Specifies the[`RowMapper`](https://docs.singlestore.com/#section-idm4625920250897633503690110446.md)function.                                                              |

Here's an example:

```java
SingleStoreIO.readWithPartitions()
.withDataSourceConfiguration(
	new SingleStoreIO.DataSourceConfiguration.create("myHost:3306")
                   .withUsername("root")
                   .withPassword("secret")
                   .withDatabase("db")
)
.withTable("t1")
.withRowMapper(rowMapper)
```

## `StatementPreparator`

The `StatementPreparator` object implements an interface that is used by `.read()` to set the parameters of the `PreparedStatement`. Here's an example:

```java
public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator {
    @Override
    public void setParameters(PreparedStatement preparedStatement) throws Exception {
        preparedStatement.setInt(1, 10);
    }
}
```

## `RowMapper`

The `RowMapper` function is used to convert each row of the `ResultSet` into an element of the resulting `PCollection`. Here's an example:

```java
public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> {
    @Override
    public MyRow mapRow(ResultSet resultSet) throws Exception {
        return MyRow.create(resultSet.getInt(1), resultSet.getString(2));
    }
}
```

## Write to SingleStore

The `.write()` transformation sends your `PCollection` object to your SingleStore databases. It returns the total number of rows written by each batch of elements. Use the following format:

```java
data.apply(
    SingleStoreIO.<USER_DATA_TYPE>write()
        .withDataSourceConfiguration(<dataSourceConfiguration>)
        .withTable("<table_name>")
        .withUserDataMapper(<userDataMapper>)
        .withBatchSize(<batch_size>)
);
```

| Parameter                                                           | Description                                                                                                                                                                |
| ------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `.withDataSourceConfiguration(<dataSourceConfiguration>)`(Required) | Specifies the`DataSourceConfiguration`object. See[Configure the DataSource](https://docs.singlestore.com/#section-idm4522194824188833503606758322.md)for more information. |
| `.withTable("<table_name>")`(Required)                              | Specifies the table to read data.                                                                                                                                          |
| `.withBatchSize(<batch_size>)`                                      | Specifies the number of rows loaded by a single`LOAD DATA`query. The default value is`100000`.                                                                             |
| `.withUserDataMapper(<userDataMapper>)`(Required)                   | Specifies the[`UserDataMapper`](https://docs.singlestore.com/#section-idm4581682772566433503694428191.md)function.                                                         |

Here's an example:

```java
SingleStoreIO.write()
.withDataSourceConfiguration(
	new SingleStoreIO.DataSourceConfiguration.create("myHost:3306")
                   .withUsername("root")
                   .withPassword("secret")
                   .withDatabase("db")
)
.withTable("t")
.withBatchSize(1000)
.withUserDataMapper(userDataMapper)
```

## `UserDataMapper`

This function maps data from `PCollection` objects to an array of Strings, which is eventually converted into a set of CSV values and loaded into SingleStore. Here's an example:

```java
public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> {
    @Override
    public List<String> mapRow(MyRow element) {
        List<String> res = new ArrayList<>();
        res.add(element.id().toString());
        res.add(element.name());
        return res;
    }
}
```

## Example

The following example shows how to connect to a SingleStore cluster from Apache Beam using the SingleStore I/O connector and perform read/write operations.

Connect to your SingleStore cluster, and run the following commands:

```sql
CREATE DATABASE dbExample;
USE dbExample;

CREATE TABLE readExample (
	ID INT,
	Name VARCHAR(32)
);

CREATE TABLE writeExample (
	ID INT,
	Name VARCHAR(32)
);

INSERT INTO readExample VALUES
(1, 'Adam'),
(2, 'Bill'),
(3, 'Clark'),
(4, 'Dennis'),
(5, 'Erik'),
(6, 'Finn'),
(7, 'Grace'),
(8, 'Henry');
```

Create a Maven project, and add the following dependencies to your **pom.xml** file:

```xml
<groupId>com.s2</groupId>
  <artifactId>s2BeamExample</artifactId>
  <version>1.0</version>

  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>2.44.0</version>
      <scope>runtime</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.singlestore/singlestore-jdbc-client -->
    <dependency>
      <groupId>com.singlestore</groupId>
      <artifactId>singlestore-jdbc-client</artifactId>
      <version>1.1.4</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-singlestore -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-singlestore</artifactId>
      <version>2.44.0</version>
    </dependency>
  </dependencies>
```

## Read Operation Example

Add the following Java code to the **S2ReadExample** class. It defines the data source configuration for the SingleStore I/O connection and reads the values from the **readExample** table as key/value pairs.

Once the data is loaded in the `PCollection`, it is converted from key/value pairs to strings and written to a `.csv` file.

```java
Pipeline pipeline = Pipeline.create();

PCollection < KV < Integer, String >> data = pipeline.apply(SingleStoreIO. < KV < Integer, String >> read()
    .withDataSourceConfiguration(DataSourceConfiguration
        .create("svchost")
        .withUsername("s2user")
        .withPassword("pa55w0rd")
        .withDatabase("dbExample"))
    .withQuery("SELECT * FROM readExample WHERE ID > ?")
    .withStatementPreparator(new StatementPreparator() {
        public void setParameters(PreparedStatement preparedStatement) throws Exception {
            preparedStatement.setInt(1, 4);
        }
    })
    .withRowMapper(new RowMapper < KV < Integer, String >> () {
        public KV < Integer, String > mapRow(ResultSet resultSet) throws Exception {
            return KV.of(resultSet.getInt(1), resultSet.getString(2));
        }
    })
);

data
    .apply(MapElements
        .into(TypeDescriptors.strings())
        .via((KV < Integer, String > kv) - > kv.getKey() + "," + kv.getValue()))
    .apply(TextIO
        .write().to("/path/to/output").withNumShards(1).withSuffix(".csv")
    );
```

Run the following commands:

```shell
mvn clean compile assembly:single
java com.s2.beam.S2ReadExample
```

After the code runs successfully, the resulting CSV file contains the following data:

```
5,Erik
6,Finn
7,Grace
8,Henry
```

## Write Operation Example

Add the following Java code to the **S2WriteExample** class. It defines the data source configuration for the SingleStore I/O connection, reads the data from the CSV file (created in the example above), and writes it into the **writeExample** table.

```Java
 Pipeline pipeline = Pipeline.create();

 PCollection < String > lines = pipeline.apply(
     TextIO.read().from("/path/to/output.csv"));

 PCollection < KV < Integer, String >> keyValues = lines.apply(
     MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
     .via((String line) - > {
         String[] fields = line.split(",");
         return KV.of(Integer.parseInt(fields[0]), fields[1]);
     })
 );

 keyValues.apply(SingleStoreIO. < KV < Integer, String >> write()
         .withDataSourceConfiguration(DataSourceConfiguration
             .create("svchost ")
             .withUsername("s2user")
             .withPassword("pa55w0rd")
             .withDatabase("dbExample"))
             .withTable("writeExample")
             .withUserDataMapper(new UserDataMapper < KV < Integer, String >> () {
                 public List < String > mapRow(KV < Integer, String > element) {
                     List < String > result = new ArrayList < > ();
                     result.add(element.getKey().toString());
                     result.add(element.getValue());
                     return result;
                 }
             })
         );
```

Run the following commands:

```shell
mvn clean compile assembly:single
java com.s2.beam.S2WriteExample
```

After the code runs successfully, run the following SQL query:

```sql
SELECT * FROM writeExample;

```

```output

+----+-------+
| ID | Name  |
+----+-------+
|  5 | Erik  |
|  6 | Finn  |
|  7 | Grace |
|  8 | Henry |
+----+-------+
```

The contents of the CSV file are now inserted into the **writeExample** table.

***

Modified at: September 26, 2025

Source: [/db/v9.1/load-data/integrate-with-singlestore/load-data-from-apache-beam/](https://docs.singlestore.com/db/v9.1/load-data/integrate-with-singlestore/load-data-from-apache-beam/)

(An index of the documentation is available at /llms.txt)
