示例:逻辑复制代码示例
下面示例演示如何通过JDBC接口使用逻辑复制功能的过程。
//逻辑复制功能示例:文件名,LogicalReplicationDemo.java
//前提条件:添加JDBC用户机器IP到数据库白名单里,在pg_hba.conf添加以下内容即可:
//假设JDBC用户IP为10.10.10.10
//host all all 10.10.10.10/32 sha256
//host replication all 10.10.10.10/32 sha256
import org.postgresql.PGProperty;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class LogicalReplicationDemo {
public static void main(String[] args) {
String driver = "org.postgresql.Driver";
//此处配置数据库IP以及端口,
String sourceURL = "jdbc:postgresql://$ip:$port/postgres";
PgConnection conn = null;
//默认逻辑复制槽的名称是:replication_slot
//测试模式:创建逻辑复制槽
int TEST_MODE_CREATE_SLOT = 1;
//测试模式:开启逻辑复制(前提条件是逻辑复制槽已经存在)
int TEST_MODE_START_REPL = 2;
//测试模式:删除逻辑复制槽
int TEST_MODE_DROP_SLOT = 3;
//开启不同的测试模式
int testMode = TEST_MODE_START_REPL;
try {
Class.forName(driver);
} catch (Exception e) {
e.printStackTrace();
return;
}
try {
Properties properties = new Properties();
PGProperty.USER.set(properties, "user");
PGProperty.PASSWORD.set(properties, "passwd");
//对于逻辑复制,以下三个属性是必须配置项
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
PGProperty.REPLICATION.set(properties, "database");
PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
conn = (PgConnection) DriverManager.getConnection(sourceURL, properties);
System.out.println("connection success!");
if(testMode == TEST_MODE_CREATE_SLOT){
conn.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName("replication_slot")
.withOutputPlugin("test_decoding")
.make();
}else if(testMode == TEST_MODE_START_REPL) {
//开启此模式前需要创建复制槽
LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568");
PGReplicationStream stream = conn
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName("replication_slot")
.withSlotOption("include-xids", false)
.withSlotOption("skip-empty-xacts", true)
.withStartPosition(waitLSN)
.start();
while (true) {
ByteBuffer byteBuffer = stream.readPending();
if (byteBuffer == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
int offset = byteBuffer.arrayOffset();
byte[] source = byteBuffer.array();
int length = source.length - offset;
System.out.println(new String(source, offset, length));
//如果需要flush lsn,根据业务实际情况调用以下接口
//LogSequenceNumber lastRecv = stream.getLastReceiveLSN();
//stream.setFlushedLSN(lastRecv);
//stream.forceUpdateStatus();
}
}else if(testMode == TEST_MODE_DROP_SLOT){
conn.getReplicationAPI()
.dropReplicationSlot("replication_slot");
}
} catch (Exception e) {
e.printStackTrace();
return;
}
}
}