OceanBase 数据链路 Concept
OceanBase 数据链路提供了从用户端到数据库端的最佳数据链路访问功能,屏蔽用户对分布式数据库的感知,保障分布式数据库的最高性能服务。数据链路包含两个组件:数据库代理和数据库驱动。
数据库代理
OceanBase 数据库代理 ODP(OceanBase Database Proxy,又称 OBProxy)是 OceanBase 专用的代理服务器,OceanBase 用户的数据会以多副本的形式存放在各个 OBServer 上,ODP 则负责接收用户发过来的 SQL 请求,转发用户 SQL 请求到最佳目标 OBServer 上,并将执行结果返回给客户。
作为 OceanBase 数据库的关键组件,ODP 具有以下特性:
- 高性能转发:ODP 完整兼容 MySQL 协议,并支持 OceanBase 自研协议,采用多线程异步框架和透明流式转发的设计,保证了数据的高性能转发,同时确保了自身对机器资源的最小消耗。
- 最佳路由:ODP 会充分考虑用户请求涉及的副本位置、用户配置的读写分离路由策略、OceanBase 多地部署的最优链路,以及 OceanBase 各机器的状态及负载情况,将用户的请求路由到最佳的 OBServer,最大程度的保证了 OceanBase 整体的高性能运转。
- 连接管理:针对一个客户端的物理连接,ODP 维持自身到后端多个 OBServer 的连接,采用基于版本号的增量同步方案维持了每个 OBServer 连接的会话状态,保证了客户端高效访问各个 OBServer。
- 专有协议:ODP 与 OBServer 默认采用了 OceanBase 专有协议,如增加报文的 CRC 校验保证与 OBServer 链路的正确性,增强传输协议以支持 Oracle 兼容性的数据类型和交互模型
- 易运维:ODP 本身无状态支持无限水平扩展,支持同时访问多个 OceanBase 集群。可以通过丰富的内部命令实现对自身状态的实时监控,提供极大的运维便利性
数据库驱动
在 OceanBase MySQL 模式下,用户可以直接使用 MySQL 官方提供的 Connector 来使用 OceanBase 数据库(暂不支持 8.0 的驱动),在 OceanBase Oracle 模式下,需要使用 OceanBase 自研的数据库驱动。OceanBase 数据库驱动同时支持 OceanBase 的 MySQL/Oracle 两种协议,在使用时可以自动识别 OceanBase 的运行模式是 MySQL 还是 Oracle,无需额外设置。OceanBase 支持各类语言的数据库驱动,下面介绍几种常见的语言及标准。
OCI 驱动
OceanBase 提供了基于 OCI 接口的数据库驱动,兼容了 Oracle 数据库 OCI 接口的方法名和函数名,对于试用 OCI 接口编程的应用程序在进行移植时,无需做大量业务改造即可非常方便的适配到 OceanBase 数据库。
样例代码:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <malloc.h>
#include "oci.h"
/*声明句柄*/
OCIEnv *envhp; /*环境句柄*/
OCISvcCtx *svchp; /*服务环境句柄*/
OCIServer *srvhp; /*服务器句柄*/
OCISession *authp; /*会话句柄*/
OCIStmt *stmthp; /*语句句柄*/
OCIDescribe *dschp; /*描述句柄*/
OCIError *errhp; /*错误句柄*/
OCIDefine *defhp[3]; /*定义句柄*/
OCIBind *bidhp[4]; /*绑定句柄*/
sb2 ind[3]; /*指示符变量*/
/*绑定select结果集的参数*/
text szpersonid[9]; /*存储personid列*/
text szsex[2]; /*存储sex列*/
text szname[51]; /*存储name列*/
text szemail[51]; /*存储mail列*/
text szphone[26]; /*存储phone列*/
char sql[256]; /*存储执行的sql语句*/
int main(int argc, char *argv[])
{
char strServerName[50];
char strUserName[50];
char strPassword[50];
/*设置服务器,用户名和密码*/
strcpy(strServerName, "host:port/db");
strcpy(strUserName, "user");
strcpy(strPassword, "pwd");
/*初始化OCI应用环境*/
OCIInitialize(OCI_DEFAULT, NULL, NULL, NULL, NULL);
/*初始化环境句柄*/
OCIEnvInit(&envhp, OCI_DEFAULT, 0, 0);
/*分配句柄*/
OCIHandleAlloc(envhp, (dvoid **)&svchp, OCI_HTYPE_SVCCTX, 0, 0);
/*服务器环境句柄*/
OCIHandleAlloc(envhp, (dvoid **)&srvhp, OCI_HTYPE_SERVER, 0, 0);
/*服务器句柄*/
OCIHandleAlloc(envhp, (dvoid **)&authp, OCI_HTYPE_SESSION, 0, 0);
/*会话句柄*/
OCIHandleAlloc(envhp, (dvoid **)&errhp, OCI_HTYPE_ERROR, 0, 0);
/*错误句柄*/
OCIHandleAlloc(envhp, (dvoid **)&dschp, OCI_HTYPE_DESCRIBE, 0, 0);
/*描述符句柄*/
/*连接服务器*/
OCIServerAttach(srvhp, errhp, (text *)strServerName,
(sb4)strlen(strServerName), OCI_DEFAULT);
/*设置用户名和密码*/
OCIAttrSet(authp, OCI_HTYPE_SESSION, (text *)strUserName,
(ub4)strlen(strUserName), OCI_ATTR_USERNAME, errhp);
OCIAttrSet(authp, OCI_HTYPE_SESSION, (text *)strPassword,
(ub4)strlen(strPassword), OCI_ATTR_PASSWORD, errhp);
/*设置服务器环境句柄属性*/
OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
(dvoid *)srvhp, (ub4)0, OCI_ATTR_SERVER, errhp);
OCIAttrSet(svchp, OCI_HTYPE_SVCCTX, (dvoid *)authp,
0, OCI_ATTR_SESSION, errhp);
/*创建并开始一个用户会话*/
OCISessionBegin(svchp, errhp, authp, OCI_CRED_RDBMS, OCI_DEFAULT);
OCIHandleAlloc(envhp, (dvoid **)&stmthp, OCI_HTYPE_STMT, 0, 0);
/*语句句柄*/
/************************************************************************/
/*查询person表*/
/************************************************************************/
strcpy(sql, "select personid ,name,phone from person");
/*准备SQL语句*/
OCIStmtPrepare(stmthp, errhp, (text *)sql, strlen(sql), OCI_NTV_SYNTAX, OCI_DEFAULT);
/*绑定输出列*/
OCIDefineByPos(stmthp, &defhp[0], errhp, 1, (ub1 *)szpersonid,
sizeof(szpersonid), SQLT_STR, &ind[0], 0, 0, OCI_DEFAULT);
OCIDefineByPos(stmthp, &defhp[1], errhp, 2, (ub1 *)szname,
sizeof(szname), SQLT_STR, &ind[1], 0, 0, OCI_DEFAULT);
OCIDefineByPos(stmthp, &defhp[2], errhp, 3, (ub1 *)szphone,
sizeof(szphone), SQLT_STR, &ind[2], 0, 0, OCI_DEFAULT);
/*执行SQL语句*/
OCIStmtExecute(svchp, stmthp, errhp, (ub4)0, 0, NULL, NULL,
OCI_DEFAULT);
printf("%-10s%-10s%-10s\n", "PERSONID", "NAME", "PHONE");
while ((OCIStmtFetch(stmthp,
errhp, 1, OCI_FETCH_NEXT, OCI_DEFAULT)) != OCI_NO_DATA)
{
printf("%-10s, %-10s,%-10s\n", szpersonid, szname, szphone);
}
//结束会话
OCISessionEnd(svchp, errhp, authp, (ub4)0);
//断开与数据库的连接
OCIServerDetach(srvhp, errhp, OCI_DEFAULT);
//释放OCI句柄
OCIHandleFree((dvoid *)dschp, OCI_HTYPE_DESCRIBE);
OCIHandleFree((dvoid *)stmthp, OCI_HTYPE_STMT);
OCIHandleFree((dvoid *)errhp, OCI_HTYPE_ERROR);
OCIHandleFree((dvoid *)authp, OCI_HTYPE_SESSION);
OCIHandleFree((dvoid *)svchp, OCI_HTYPE_SVCCTX);
OCIHandleFree((dvoid *)srvhp, OCI_HTYPE_SERVER);
return 0;
}
JDBC 驱动
OceanBase 提供了基于 Java JDBC 标准的数据库驱动,JDBC(Java Database Connectivity) 是 Java 应用程序访问数据库的标准 API(应用程序编程接口),数据库驱动的实现会将 JDBC 标准编程接口转换成对应数据库厂商的 SQL 实现。OceanBase JDBC 驱动兼容 JDBC 4.0、4.1、4.2 标准。
JDBC 连接串的前置为 jdbc:oceanbase,其他使用和标准 JDBC 方式保持一致。样例代码:
String url = "jdbc:oceanbase://host:port/SYS?useUnicode=true&characterEncoding=utf-8";
String username = "SYS@oracle";
String password = "";
Connection conn = null;
try {
Class.forName("com.alipay.oceanbase.jdbc.Driver");
conn = DriverManager.getConnection(url, username, password);
PreparedStatement ps = conn.prepareStatement("select to_char(sysdate,'yyyy-MM-dd HH24:mi:ss') from dual;");
ResultSet rs = ps.executeQuery();
rs.next();
System.out.println("sysdate is:" + rs.getString(1));
rs.close();
ps.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != conn) {
conn.close();
}
}
ODBC 驱动
OceanBase 提供了基于 ODBC 标准的数据库驱动,ODBC (Open Database Connectivity)和 JDBC 标准一样,提供了一种标准的API(应用程序编程接口)方法来访问数据库管理系统,目前已经被业界广泛接受。
目前 OceanBase 提供了 Linux 版本和 Windows 版本的 ODBC 驱动,样例代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "sql.h"
#include "sqlext.h"
#define MAX_NAME_LEN 255
typedef struct tagODBCHandler {
SQLHENV henv;
SQLHDBC hdbc;
SQLHSTMT hstmt;
}ODBCHandler;
int IS_SUCC(SQLRETURN retcode) {
if (retcode == SQL_SUCCESS || retcode == SQL_SUCCESS_WITH_INFO) return 1;
return 0;
}
int main(int argc, char** argv) {
ODBCHandler handler;
SQLRETURN retcode;
SQLCHAR connOut[MAX_NAME_LEN+1];
SQLSMALLINT len;
//Allocate environment handle
retcode = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &handler.henv);
// Set the ODBC version environment attribute
if (!IS_SUCC(retcode)) {
return -1;
}
retcode = SQLSetEnvAttr(handler.henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3_80, 0);
// Allocate connection handle
if (!IS_SUCC(retcode)) {
return -1;
}
retcode = SQLAllocHandle(SQL_HANDLE_DBC, handler.henv, &handler.hdbc);
if (!IS_SUCC(retcode)) {
return -1;
}
// Set login timeout to 5 seconds
SQLSetConnectAttr(handler.hdbc, SQL_LOGIN_TIMEOUT, (SQLPOINTER)5, 0);
// Connect to data source
retcode = SQLDriverConnect(handler.hdbc, NULL, (SQLCHAR*)"DSN=odbctest", SQL_NTS, connOut, MAX_NAME_LEN, &len,SQL_DRIVER_NOPROMPT);
if (!IS_SUCC(retcode)) {
return -1;
}
retcode = SQLAllocHandle(SQL_HANDLE_STMT, handler.hdbc, &handler.hstmt);
if (!IS_SUCC(retcode)) {
return -1;
}
{
//insert
retcode = SQLPrepare(handler.hstmt, (SQLCHAR*)"insert into PERSON values(?,?,'test')", SQL_NTS);
if (!IS_SUCC(retcode)) {
return -1;
}
SQLINTEGER id = 0;
SQLINTEGER num = 0;
retcode = SQLBindParameter(handler.hstmt, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER, 0, 0, &id, 0, NULL);
if (!IS_SUCC(retcode)) {
return -1;
}
retcode = SQLBindParameter(handler.hstmt, 2, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER, 0, 0, &num, 0, NULL);
if (!IS_SUCC(retcode)) {
return -1;
}
retcode = SQLExecute(handler.hstmt);
if (!IS_SUCC(retcode)) {
return -1;
}
}
// clean handle
SQLFreeHandle(SQL_HANDLE_STMT, handler.hstmt);
SQLDisconnect(handler.hdbc);
SQLFreeHandle(SQL_HANDLE_DBC, handler.hdbc);
SQLFreeHandle(SQL_HANDLE_ENV, handler.henv);
return 0;
}
.NET 驱动
OceanBase 提供了基于 .NET 的数据库驱动,驱动提供了对 Entity Framework Core 和 Entity Framework 6.x 的兼容能力,用户可以使用 OceanBase .NET 驱动结合 Entity Framework 快速开发应用程序。.NET 驱动兼容 .NET Framework 4.5、.NET Framework 4.6、.NET Core 2.0 版本。
样例代码:
using OceanBase.Data.OceanBaseClient;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ObOracleDemo
{
class Program
{
static void Main(string[] args)
{
using (MySqlConnection c = new MySqlConnection(
"server=100.88.105.219;port=63035;user id=SYS@tt_ww_oracle;database=SYS;Connection timeout=5;" +
"pooling=false;includesecurityasserts=false;characterset=utf8"))
{
DateTime start = DateTime.Now;
try
{
c.Open();
Console.WriteLine("connect ob oracle successfully!");
MySqlCommand cmd = new MySqlCommand("drop table test_oracle", c);
try
{
cmd.ExecuteNonQuery();
}
catch (Exception ex)
{
Console.WriteLine("exception happened, error is");
Console.WriteLine(ex.Message);
Console.WriteLine(ex.StackTrace);
}
cmd.Dispose();
cmd = new MySqlCommand("create table test_oracle (c1 int primary key, c2 varchar(256), c3 timestamp(9))", c);
cmd.ExecuteNonQuery();
cmd.Dispose();
IDbCommand psCmd = c.CreateCommand();
psCmd.CommandText = "insert into test_oracle (c1, c2, c3) values (?p1,?p2, ?p3)";
IDbDataParameter p1 = psCmd.CreateParameter();
p1.ParameterName = "?p1";
p1.DbType = DbType.Int32;
p1.Precision = (byte)10;
p1.Scale = (byte)0;
p1.Size = 4;
psCmd.Parameters.Add(p1);
p1.Value = 10;
IDbDataParameter p2 = psCmd.CreateParameter();
p2.ParameterName = "?p2";
p2.DbType = DbType.String;
psCmd.Parameters.Add(p2);
p2.Value = "Hello C#";
MySqlParameter p3 = new MySqlParameter();
p3.ParameterName = "?p3";
p3.MySqlDbType = MySqlDbType.DateTime;
p3.Value = "2020-03-09 21:23:22.878879";
psCmd.Parameters.Add(p3);
psCmd.Prepare();
psCmd.ExecuteNonQuery();
psCmd.Dispose();
psCmd = c.CreateCommand();
psCmd.CommandText = "select c1,c2,c3 from test_oracle";
psCmd.Prepare();
MySqlDataReader reader = (MySqlDataReader)psCmd.ExecuteReader();
reader.Read();
Console.WriteLine("c1=" + reader.GetInt32(0));
Console.WriteLine("c2=" + reader.GetString(1));
Console.WriteLine("c3=" + reader.GetDateTime(2));
psCmd.Dispose();
}
catch (Exception ex)
{
Console.WriteLine("exception happened, error is");
Console.WriteLine(ex.Message);
Console.WriteLine(ex.StackTrace);
}
finally
{
Console.WriteLine("Press Any Key To Continue...");
Console.ReadKey();
}
}
}
}
}