Load Data from Apache Flink

Accelerate your workloads by combining Apache Flink's stream-processing capabilities with SingleStore's high performance ingest. Use the SingleStore JDBC driver ("the driver") to connect to SingleStore Helios from Apache Flink. The driver allows data to be ingested into, and extracted from, SingleStore databases, providing seamless integration with Apache Flink's DataStream and Table APIs.

Prerequisites

To connect to SingleStore from Apache Flink,

  1. Update the pom.xml file of your Maven project to include the dependency for the SingleStore JDBC driver. For example:

    <dependency>
    <groupId>com.singlestore</groupId>
    <artifactId>singlestore-jdbc-client</artifactId>
    <version>1.2.4</version>
    </dependency>
  2. Define the connection configuration for your SingleStore Helios deployment using a JdbcConnectionOptions object in your application code.

    JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:singlestore://<hostname>:<port>/<database>")
    .withDriverName("com.singlestore.jdbc.Driver")
    .withUsername("<username>")
    .withPassword("<password>")
    .build();

    Update the following before running the application:

    • hostname: Hostname or IP address of your SingleStore Helios deployment.

    • port: Port of the SingleStore Helios deployment. Default is 3306.

    • database: Name of the SingleStore database to connect with.

    • username: Name of the SingleStore database user with which to access the database.

    • password: Password for the SingleStore database user.

  3. Connect to your SingleStore databases using the JdbcConnectionOptions object.

Example

The following example connects to a SingleStore database from Apache Flink using the SingleStore JDBC driver and inserts data.

  1. Connect to the target SingleStore deployment and run the following SQL commands. This example inserts data into the dbTest.Stock table.

    CREATE DATABASE dbTest;
    USE dbTest;
    CREATE TABLE Stock
    ( ID INT, Code VARCHAR(4), Quantity INT );
  2. Create a new Maven project in your local environment. For example:

    mvn archetype:generate -DgroupId=SingleStore \
    -DartifactId=FlinkSingleStoreConnection \
    -DarchetypeArtifactId=maven-archetype-quickstart \
    -DinteractiveMode=false
  3. Add the following dependency to the pom.xml file of the Maven project, including any other dependencies required.

    <dependency>
    <groupId>com.singlestore</groupId>
    <artifactId>singlestore-jdbc-client</artifactId>
    <version>1.2.4</version>
    </dependency>
  4. Build the Maven project from the parent directory using the following command.

    mvn clean package
  5. Add the following code to a flinkExample.java file in your Maven project (for example, in the src/main/java/com/SingleStore directory) . In this example, the JdbcSink.sink() method uses the JdbcConnectionOptions object to connect to the SingleStore instance and insert three rows of data.

    Note: Update the connection configuration of your SingleStore deployment in the JdbcConnectionOptionsBuilder() constructor.

    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.connector.jdbc.JdbcSink;
    import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
    import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    public class flinkExample {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Tuple3<Integer, String, Integer>> stock = env.fromElements(
    Tuple3.of(101, "CVBN", 40),
    Tuple3.of(102, "FGTR", 25),
    Tuple3.of(103, "YTTK", 20)
    );
    JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:singlestore://<hostname>/dbTest")
    .withDriverName("com.singlestore.jdbc.Driver")
    .withUsername("<username>")
    .withPassword("<password>")
    .build();
    stock.addSink(JdbcSink.sink(
    "INSERT INTO Stock (ID, Code, Quantity) VALUES (?, ?, ?)", // SQL Insert Query
    (statement, user) -> {
    statement.setInt(1, user.f0);
    statement.setString(2, user.f1);
    statement.setInt(3, user.f2);
    },
    JdbcExecutionOptions.builder()
    .withBatchSize(1000)
    .withBatchIntervalMs(200)
    .build(),
    connectionOptions
    ));
    // Execute Flink job
    env.execute("Flink Insert Example");
    }
    }
  6. Build the Maven project using the following command. This command creates a .jar file in the /target directory of your project.

    mvn clean package
  7. From the directory where Flink is extracted, run the start-cluster.sh script to start a Flink cluster. This example uses a local Flink cluster. Perform the following tasks:

    1. Change to the directory where Flink is extracted.

      cd /path_to_flink_directory/flink-<version>
    2. Run the following command to start a Flink cluster.

      ./bin/start-cluster.sh
  8. Use the .jar file created in the previous step and submit a Flink job. Run the following command from the directory where Flink is extracted.

    ./bin/flink run /path/to/jar_file.jar
  9. After the job completes, connect to the SingleStore database and run the following SQL query:

    SELECT * FROM Stock ORDER BY ID;
    +------+------+----------+
    | ID   | Code | Quantity |
    +------+------+----------+
    |  101 | CVBN |       40 |
    |  102 | FGTR |       25 |
    |  102 | YTTK |       20 |
    +------+------+----------+

    The data has been successfully inserted.

Last modified: October 3, 2024

Was this article helpful?