JDBC Connector
This connector provides a sink that writes data to a JDBC database.
To use it, add the following dependency to your project (along with your JDBC-driver):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.0</version>
</dependency>
Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.
Created JDBC sink provides at-least-once guarantee. Effectively exactly-once can be achived using upsert statements or idempotent updates.
Example usage:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(getDbMetadata().getUrl())
.withDriverName(getDbMetadata().getDriverClass())
.build()));
env.execute();
Please refer to the API documentation for more details.