# Load Data from Apache Flink

Accelerate your workloads by combining Apache Flink's stream-processing capabilities with SingleStore's high performance ingest. Use the SingleStore JDBC driver ("the driver") to connect to SingleStore from Apache Flink. The driver allows data to be ingested into, and extracted from, SingleStore databases, providing seamless integration with Apache Flink's DataStream and Table APIs.

## Prerequisites

* An active SingleStore deployment.
* [The SingleStore JDBC Driver](https://docs.singlestore.com/db/v9.1/developer-resources/connect-with-application-development-tools/connect-with-java-jdbc/the-singlestore-jdbc-driver.md) installed with Maven.
* Java JDK

## Connect to SingleStore from Apache Flink

To connect to SingleStore from Apache Flink,

1. Update the `pom.xml` file of your Maven project to include the dependency for the SingleStore JDBC driver. For example:
   ```XML
   <dependency>
       <groupId>com.singlestore</groupId>
       <artifactId>singlestore-jdbc-client</artifactId>
       <version>1.2.4</version>
   </dependency>
   ```

2. Define the connection configuration for your SingleStore deployment using a `JdbcConnectionOptions` object in your application code.
   ```Java
   JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
       .withUrl("jdbc:singlestore://<hostname>:<port>/<database>")
       .withDriverName("com.singlestore.jdbc.Driver")
       .withUsername("<username>")
       .withPassword("<password>")
       .build();
   ```
   Update the following before running the application:

   * `hostname`: Hostname or IP address of your SingleStore deployment.
   * `port`: Port of the SingleStore deployment. Default is `3306`.
   * `database`: Name of the SingleStore database to connect with.
   * `username`: Name of the SingleStore database user with which to access the database.
   * `password`: Password for the SingleStore database user.

3. Connect to your SingleStore databases using the `JdbcConnectionOptions` object.

## Example

The following example connects to a SingleStore database from Apache Flink using the SingleStore JDBC driver and inserts data.

1. Connect to the target SingleStore deployment and run the following SQL commands. This example inserts data into the `dbTest.Stock` table.
   ```sql
   CREATE DATABASE dbTest;
   USE dbTest;

   CREATE TABLE Stock 
   ( ID INT, Code VARCHAR(4), Quantity INT );
   ```

2. Create a new Maven project in your local environment. For example:
   ```shell
   mvn archetype:generate -DgroupId=SingleStore \
   -DartifactId=FlinkSingleStoreConnection \
   -DarchetypeArtifactId=maven-archetype-quickstart \
   -DinteractiveMode=false
   ```

3. Add the following dependency to the `pom.xml` file of the Maven project, including any other dependencies required.
   ```xml
   <dependency>
       <groupId>com.singlestore</groupId>
       <artifactId>singlestore-jdbc-client</artifactId>
       <version>1.2.4</version>
   </dependency>
   ```

4. Build the Maven project from the parent directory using the following command.
   ```shell
   mvn clean package
   ```

5. Add the following code to a `flinkExample.java` file in your Maven project (for example, in the `src/main/java/com/SingleStore` directory) . In this example, the `JdbcSink.sink()` method uses the `JdbcConnectionOptions` object to connect to the SingleStore instance and insert three rows of data.

   **Note**: Update the connection configuration of your SingleStore deployment in the `JdbcConnectionOptionsBuilder()` constructor.
   ```java
   import org.apache.flink.api.java.tuple.Tuple3;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.connector.jdbc.JdbcSink;
   import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
   import org.apache.flink.connector.jdbc.JdbcConnectionOptions;

   public class flinkExample {
       public static void main(String[] args) throws Exception {

           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

           DataStream<Tuple3<Integer, String, Integer>> stock = env.fromElements(
                   Tuple3.of(101, "CVBN", 40),
                   Tuple3.of(102, "FGTR", 25),
                   Tuple3.of(103, "YTTK", 20)
           );

   	JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
               .withUrl("jdbc:singlestore://<hostname>/dbTest")
               .withDriverName("com.singlestore.jdbc.Driver")
               .withUsername("<username>")
               .withPassword("<password>")
               .build();


           stock.addSink(JdbcSink.sink(
                   "INSERT INTO Stock (ID, Code, Quantity) VALUES (?, ?, ?)", // SQL Insert Query
                   (statement, user) -> {
                       statement.setInt(1, user.f0);
   		    statement.setString(2, user.f1);
                       statement.setInt(3, user.f2);  
                   },
                   JdbcExecutionOptions.builder()
                           .withBatchSize(1000)
                           .withBatchIntervalMs(200)
                           .build(),
                   connectionOptions
           ));

           // Execute Flink job
           env.execute("Flink Insert Example");
       }
   }
   ```

6. Build the Maven project using the following command. This command creates a `.jar` file in the `/target` directory of your project.
   ```shell
   mvn clean package
   ```

7. From the directory where Flink is extracted, run the `start-cluster.sh` script to start a Flink cluster. This example uses a local Flink cluster. Perform the following tasks:

   1. Change to the directory where Flink is extracted.
      ```shell
      cd /path_to_flink_directory/flink-<version>
      ```

   2. Run the following command to start a Flink cluster.
      ```shell
      ./bin/start-cluster.sh
      ```

8. Use the `.jar` file created in the previous step and submit a Flink job. Run the following command from the directory where Flink is extracted.
   ```shell
   ./bin/flink run /path/to/jar_file.jar
   ```

9. After the job completes, connect to the SingleStore database and run the following SQL query:
   ```sql
   SELECT * FROM Stock ORDER BY ID;

   ```
   ```output

   +------+------+----------+
   | ID   | Code | Quantity |
   +------+------+----------+
   |  101 | CVBN |       40 |
   |  102 | FGTR |       25 |
   |  102 | YTTK |       20 |
   +------+------+----------+
   ```
   The data has been successfully inserted.

***

Modified at: September 26, 2025

Source: [/db/v9.1/load-data/integrate-with-singlestore/load-data-from-apache-flink/](https://docs.singlestore.com/db/v9.1/load-data/integrate-with-singlestore/load-data-from-apache-flink/)

(An index of the documentation is available at /llms.txt)
