Load Data from Apache Flink
On this page
Accelerate your workloads by combining Apache Flink's stream-processing capabilities with SingleStore's high performance ingest.
Prerequisites
-
An active SingleStore Helios deployment.
-
The SingleStore JDBC Driver installed with Maven.
-
Java JDK
Connect to SingleStore from Apache Flink
To connect to SingleStore from Apache Flink,
-
Update the
pom.
file of your Maven project to include the dependency for the SingleStore JDBC driver.xml For example: <dependency><groupId>com.singlestore</groupId><artifactId>singlestore-jdbc-client</artifactId><version>1.2.4</version></dependency> -
Define the connection configuration for your SingleStore Helios deployment using a
JdbcConnectionOptions
object in your application code.JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:singlestore://<hostname>:<port>/<database>").withDriverName("com.singlestore.jdbc.Driver").withUsername("<username>").withPassword("<password>").build();Update the following before running the application:
-
hostname
: Hostname or IP address of your SingleStore Helios deployment. -
port
: Port of the SingleStore Helios deployment.Default is 3306
. -
database
: Name of the SingleStore database to connect with. -
username
: Name of the SingleStore database user with which to access the database. -
password
: Password for the SingleStore database user.
-
-
Connect to your SingleStore databases using the
JdbcConnectionOptions
object.
Example
The following example connects to a SingleStore database from Apache Flink using the SingleStore JDBC driver and inserts data.
-
Connect to the target SingleStore deployment and run the following SQL commands.
This example inserts data into the dbTest.
table.Stock CREATE DATABASE dbTest;USE dbTest;CREATE TABLE Stock( ID INT, Code VARCHAR(4), Quantity INT ); -
Create a new Maven project in your local environment.
For example: mvn archetype:generate -DgroupId=SingleStore \-DartifactId=FlinkSingleStoreConnection \-DarchetypeArtifactId=maven-archetype-quickstart \-DinteractiveMode=false -
Add the following dependency to the
pom.
file of the Maven project, including any other dependencies required.xml <dependency><groupId>com.singlestore</groupId><artifactId>singlestore-jdbc-client</artifactId><version>1.2.4</version></dependency> -
Build the Maven project from the parent directory using the following command.
mvn clean package -
Add the following code to a
flinkExample.
file in your Maven project (for example, in thejava src/main/java/com/SingleStore
directory) .In this example, the JdbcSink.
method uses thesink() JdbcConnectionOptions
object to connect to the SingleStore instance and insert three rows of data.Note: Update the connection configuration of your SingleStore deployment in the
JdbcConnectionOptionsBuilder()
constructor.import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.connector.jdbc.JdbcExecutionOptions;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;public class flinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple3<Integer, String, Integer>> stock = env.fromElements(Tuple3.of(101, "CVBN", 40),Tuple3.of(102, "FGTR", 25),Tuple3.of(103, "YTTK", 20));JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:singlestore://<hostname>/dbTest").withDriverName("com.singlestore.jdbc.Driver").withUsername("<username>").withPassword("<password>").build();stock.addSink(JdbcSink.sink("INSERT INTO Stock (ID, Code, Quantity) VALUES (?, ?, ?)", // SQL Insert Query(statement, user) -> {statement.setInt(1, user.f0);statement.setString(2, user.f1);statement.setInt(3, user.f2);},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).build(),connectionOptions));// Execute Flink jobenv.execute("Flink Insert Example");}} -
Build the Maven project using the following command.
This command creates a .
file in thejar /target
directory of your project.mvn clean package -
From the directory where Flink is extracted, run the
start-cluster.
script to start a Flink cluster.sh This example uses a local Flink cluster. Perform the following tasks: -
Change to the directory where Flink is extracted.
cd /path_to_flink_directory/flink-<version> -
Run the following command to start a Flink cluster.
./bin/start-cluster.sh
-
-
Use the
.
file created in the previous step and submit a Flink job.jar Run the following command from the directory where Flink is extracted. ./bin/flink run /path/to/jar_file.jar -
After the job completes, connect to the SingleStore database and run the following SQL query:
SELECT * FROM Stock ORDER BY ID;+------+------+----------+ | ID | Code | Quantity | +------+------+----------+ | 101 | CVBN | 40 | | 102 | FGTR | 25 | | 102 | YTTK | 20 | +------+------+----------+
The data has been successfully inserted.
Last modified: October 3, 2024