Load Data from Apache Flink
Warning
SingleStore 9.0 gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 8.9 is recommended for production workloads, which can later be upgraded to SingleStore 9.0.
On this page
Accelerate your workloads by combining Apache Flink's stream-processing capabilities with SingleStore's high performance ingest.
Prerequisites
-
An active SingleStore deployment.
-
The SingleStore JDBC Driver installed with Maven.
-
Java JDK
Connect to SingleStore from Apache Flink
To connect to SingleStore from Apache Flink,
-
Update the
pom.
file of your Maven project to include the dependency for the SingleStore JDBC driver.xml For example: <dependency><groupId>com.singlestore</groupId><artifactId>singlestore-jdbc-client</artifactId><version>1.2.4</version></dependency> -
Define the connection configuration for your SingleStore deployment using a
JdbcConnectionOptions
object in your application code.JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:singlestore://<hostname>:<port>/<database>").withDriverName("com.singlestore.jdbc.Driver").withUsername("<username>").withPassword("<password>").build();Update the following before running the application:
-
hostname
: Hostname or IP address of your SingleStore deployment. -
port
: Port of the SingleStore deployment.Default is 3306
. -
database
: Name of the SingleStore database to connect with. -
username
: Name of the SingleStore database user with which to access the database. -
password
: Password for the SingleStore database user.
-
-
Connect to your SingleStore databases using the
JdbcConnectionOptions
object.
Example
The following example connects to a SingleStore database from Apache Flink using the SingleStore JDBC driver and inserts data.
-
Connect to the target SingleStore deployment and run the following SQL commands.
This example inserts data into the dbTest.
table.Stock CREATE DATABASE dbTest;USE dbTest;CREATE TABLE Stock( ID INT, Code VARCHAR(4), Quantity INT ); -
Create a new Maven project in your local environment.
For example: mvn archetype:generate -DgroupId=SingleStore \-DartifactId=FlinkSingleStoreConnection \-DarchetypeArtifactId=maven-archetype-quickstart \-DinteractiveMode=false -
Add the following dependency to the
pom.
file of the Maven project, including any other dependencies required.xml <dependency><groupId>com.singlestore</groupId><artifactId>singlestore-jdbc-client</artifactId><version>1.2.4</version></dependency> -
Build the Maven project from the parent directory using the following command.
mvn clean package -
Add the following code to a
flinkExample.
file in your Maven project (for example, in thejava src/main/java/com/SingleStore
directory) .In this example, the JdbcSink.
method uses thesink() JdbcConnectionOptions
object to connect to the SingleStore instance and insert three rows of data.Note: Update the connection configuration of your SingleStore deployment in the
JdbcConnectionOptionsBuilder()
constructor.import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.connector.jdbc.JdbcExecutionOptions;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;public class flinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple3<Integer, String, Integer>> stock = env.fromElements(Tuple3.of(101, "CVBN", 40),Tuple3.of(102, "FGTR", 25),Tuple3.of(103, "YTTK", 20));JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:singlestore://<hostname>/dbTest").withDriverName("com.singlestore.jdbc.Driver").withUsername("<username>").withPassword("<password>").build();stock.addSink(JdbcSink.sink("INSERT INTO Stock (ID, Code, Quantity) VALUES (?, ?, ?)", // SQL Insert Query(statement, user) -> {statement.setInt(1, user.f0);statement.setString(2, user.f1);statement.setInt(3, user.f2);},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).build(),connectionOptions));// Execute Flink jobenv.execute("Flink Insert Example");}} -
Build the Maven project using the following command.
This command creates a .
file in thejar /target
directory of your project.mvn clean package -
From the directory where Flink is extracted, run the
start-cluster.
script to start a Flink cluster.sh This example uses a local Flink cluster. Perform the following tasks: -
Change to the directory where Flink is extracted.
cd /path_to_flink_directory/flink-<version> -
Run the following command to start a Flink cluster.
./bin/start-cluster.sh
-
-
Use the
.
file created in the previous step and submit a Flink job.jar Run the following command from the directory where Flink is extracted. ./bin/flink run /path/to/jar_file.jar -
After the job completes, connect to the SingleStore database and run the following SQL query:
SELECT * FROM Stock ORDER BY ID;+------+------+----------+ | ID | Code | Quantity | +------+------+----------+ | 101 | CVBN | 40 | | 102 | FGTR | 25 | | 102 | YTTK | 20 | +------+------+----------+
The data has been successfully inserted.
Last modified: October 3, 2024