创建Table实例
通过PegasusClientInterface::openTable()
方法获取PegasusTableInterface的对象实例:
- /**
- * Open a table. Please notice that pegasus support two kinds of API:
- * 1. the client-interface way, which is provided in this class.
- * 2. the table-interface way, which is provided by {@link PegasusTableInterface}.
- * With the client-interface, you don't need to create PegasusTableInterface by openTable, so
- * you can access the pegasus cluster conveniently. However, the client-interface's api also has
- * some restrictions:
- * 1. we don't provide async methods in client-interface.
- * 2. the timeout in client-interface isn't as accurate as the table-interface.
- * 3. the client-interface may throw an exception when open table fails. It means that
- * you may need to handle this exception in every data access operation, which is annoying.
- * 4. You can't specify a per-operation timeout.
- * So we recommend you to use the table-interface.
- *
- * @param tableName the table should be exist on the server, which is created before by
- * the system administrator
- * @return the table handler
- * @throws PException
- */
- public PegasusTableInterface openTable(String tableName) throws PException;
注:
- 如果网络超时或者表不存在,都会抛出异常。使用示例:
- PegasusTableInterface table = client.openTable(tableName);
PegasusTableInterface中同时提供了同步和异步的API。
同步API与PegasusClientInterface基本一致,区别在于:不用指定tableName参数;可以单独指定超时时间。
基于Future的异步API
异步API使用Future模式,具体来说是使用的 io.netty.util.concurrent.Future (参见 https://netty.io/4.1/api/index.html )。每个异步接口的返回值都是一个Future<T>,其中T是该操作返回结果的类型。Future具有如下特性:
- 可以通过 addListener() 方法设置一个或者多个Listener,即异步回调函数。回调函数会在操作完成时被调用;如果在add时操作已经完成,回调函数就会被立即调用;回调函数被调用的顺序与添加的顺序一致。
- 可以通过 await() 方法等待操作完成。但是注意的是await()方法只能保证操作完成以及下面的三个方法可用,并不能保证回调函数已经被执行。
- 在操作完成后,可以通过 isSuccess() 方法判断操作是否成功;如果成功,可以通过 getNow() 方法获取结果;如果失败,可以通过 cause() 方法获取异常。注意:第一次调用一个表的异步API的时候,函数返回之前可能会有一些额外延迟(典型地10ms左右),这是因为第一次调用时需要从meta-server获取表的信息和路由信息。
一个典型的异步使用样例:
- // 获取Table实例
- PegasusTableInterface table = client.openTable(tableName);
- // 发起异步调用
- Future<Boolean> future = table.asyncExist(hashKey, sortKey, 0);
- // 设置回调函数
- future.addListener(
- new ExistListener() {
- public void operationComplete(Future<Boolean> future) throws Exception {
- if (future.isSuccess()) {
- Boolean result = future.getNow();
- }
- else {
- future.cause().printStackTrace();
- }
- }
- }
- );
- // 等待操作完成
- future.await();