Spring Boot整合Thrift RPC

Spring Boot自动配置简介

在介绍RPC之前,我们先来学习下Spring Boot的自动配置。

我们前面已经提到:Spring Boot来源于Spring,并且做了众多改进,其中最有用的设计理念是约定优于配置,它通过自动配置功能(大多数开发者平时习惯设置的配置作为默认配置)为开发者快速、准确地构建出标准化的应用。

以集成MySQL数据库为例,在Spring Boot出现之前,我们要

  1. 配置JDBC驱动依赖
  2. 配置XML文件中数据源
  3. 配置XML中的DataSource Bean
  4. 配置XML中的XXXTemplate Bean
  5. 配置XML中的XXXTransactionManager Bean

有了Spring Boot的自动配置后,自动配置帮我们生成了各种DataSource、XXXTemplate、XXXTransactionManager,我们所需要做的只有一条,就是激活它

  1. maven中依赖包含自动配置的包
  2. 配置JDBC驱动依赖
  3. yaml文件中定义数据源

自动配置进行智能检测,只要满足上述3个条件,其他的Bean都会被自动生成并注入到Spring环境中。我们需要使用时只需要@Autowired一下就可以了,是不是非常简单!

由于篇幅所限,本书不会对自动配置的书写做零起点教学,如果你想了解自动配置的原理,可以参考这篇文章spring boot实战(第十三篇)自动配置原理分析

在本节的后续部分,我们会以Thrift RPC Server为例,看看自动配置是如何书写的。

RPC简介

远程过程调用(remote procedure call或简称RPC),指的是运行于本地(客户端)的程序像调用本地程序一样,直接调用另一台计算机(服务器端)的程序,而程序员无需额外为远程交互做额外的编程。

RPC极大地简化了分布似乎系统中节点之间网络通信的开发工作量,是微服务架构中的重要组件之一。

在本书中,我们选用Thrift作为RPC框架。由于篇幅所限,我们不会对Thrift RPC作出详尽的介绍,如果你还不熟悉,可以参考官方的快速入门文档

Spring Boot整合Thrift RPC服务端

简要来说,启动一个Thrift RPC的服务端需要如下步骤:

  1. 书写DSL(.thrift文件),定义函数、数据结构等。
  2. 编译并生成桩代码。
  3. 编写Handler(RPC的逻辑入口)。
  4. 基于上述Handler,构造Processor。
  5. 构造Server,Thrift提供了多种服务端供选择,常用的有TThreadPoolServer(多线程服务器)和TNonblockingServer(非阻塞服务器)。
  6. 设置Server的Protocol,类似的,Thrift提供了多种传输协议,最常用的是TBinaryProtocol和TCompactProtocol。
  7. 设置Server的Transport(Factory),用这种方式指定底层的传输协议,常用的有TFramedTransport、TNonBlockingTransport,不同的Transport可以类似Java的IOStreawm方式,相互叠加,以产生更强大的效果。

上述对Thrift服务器的架构做了简要介绍,如果想更深入了解,可以自行阅读官方源码

首先,我们来看一下thrift定义(根据上一节的介绍,文件放在lmsia-abc-common包中)

  1. namespace java com.coder4.lmsia.abc
  2. service lmsiaAbcThrift {
  3. string sayHi()
  4. }

调用thrift进行编译后,我们也将对应的桩文件放置在lmsia-abc-client下,目录结构可以参见上一节。

为了更方便的在Spring Boot中集成Thrift服务器,我将相应代码抽取成了公用库lmsia-thrift-server

  1. ├── build.gradle
  2. ├── gradle
  3. └── wrapper
  4. ├── gradle-wrapper.jar
  5. └── gradle-wrapper.properties
  6. ├── gradlew
  7. ├── gradlew.bat
  8. ├── README.md
  9. ├── settings.gradle
  10. └── src
  11. ├── main
  12. ├── java
  13. └── com
  14. └── coder4
  15. └── lmsia
  16. └── thrift
  17. └── server
  18. ├── configuration
  19. └── ThriftServerConfiguration.java
  20. └── ThriftServerRunnable.java
  21. └── resources
  22. └── META-INF
  23. └── spring.factories
  24. └── test
  25. └── java

简单解析下项目结构:gradle相关: 与前节介绍的类似,只不过这里是单项目功能。ThriftServerConfiguration: 自动配置,当满足条件后会自动激活,激活后可自动启动Thrift RPC服务。ThriftServerRunnable: Thrift RPC服务器的构造逻辑、运行线程。spring.factories: 当我们以类库方式提供自动配置时,需要增加这个spring.factories,让别的项目能”定位到”要检查的自动配置。

首先,我们来看一下ThriftServerRunnable.java

  1. package com.coder4.lmsia.thrift.server;
  2. import org.apache.thrift.TProcessor;
  3. import org.apache.thrift.protocol.TBinaryProtocol;
  4. import org.apache.thrift.protocol.TProtocolFactory;
  5. import org.apache.thrift.server.TServer;
  6. import org.apache.thrift.server.TThreadedSelectorServer;
  7. import org.apache.thrift.transport.TFramedTransport;
  8. import org.apache.thrift.transport.TNonblockingServerSocket;
  9. import org.apache.thrift.transport.TNonblockingServerTransport;
  10. import org.apache.thrift.transport.TTransportException;
  11. import org.apache.thrift.transport.TTransportFactory;
  12. import java.util.concurrent.ExecutorService;
  13. import java.util.concurrent.SynchronousQueue;
  14. import java.util.concurrent.ThreadPoolExecutor;
  15. import java.util.concurrent.TimeUnit;
  16. /**
  17. * @author coder4
  18. */
  19. public class ThriftServerRunnable implements Runnable {
  20. private static final int THRIFT_PORT = 3000;
  21. private static final int THRIFT_TIMEOUT = 5000;
  22. private static final int THRIFT_TCP_BACKLOG = 5000;
  23. private static final int THRIFT_CORE_THREADS = 128;
  24. private static final int THRIFT_MAX_THREADS = 256;
  25. private static final int THRIFT_SELECTOR_THREADS = 16;
  26. private static final TProtocolFactory THRIFT_PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
  27. // 16MB
  28. private static final int THRIFT_MAX_FRAME_SIZE = 16 * 1024 * 1024;
  29. // 4MB
  30. private static final int THRIFT_MAX_READ_BUF_SIZE = 4 * 1024 * 1024;
  31. protected ExecutorService threadPool;
  32. protected TServer server;
  33. protected Thread thread;
  34. private TProcessor processor;
  35. private boolean isDestroy = false;
  36. public ThriftServerRunnable(TProcessor processor) {
  37. this.processor = processor;
  38. }
  39. public TServer build() throws TTransportException {
  40. TNonblockingServerSocket.NonblockingAbstractServerSocketArgs socketArgs =
  41. new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs();
  42. socketArgs.port(THRIFT_PORT);
  43. socketArgs.clientTimeout(THRIFT_TIMEOUT);
  44. socketArgs.backlog(THRIFT_TCP_BACKLOG);
  45. TNonblockingServerTransport transport = new TNonblockingServerSocket(socketArgs);
  46. threadPool =
  47. new ThreadPoolExecutor(THRIFT_CORE_THREADS, THRIFT_MAX_THREADS,
  48. 60L, TimeUnit.SECONDS,
  49. new SynchronousQueue<>());
  50. TTransportFactory transportFactory = new TFramedTransport.Factory(THRIFT_MAX_FRAME_SIZE);
  51. TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
  52. .selectorThreads(THRIFT_SELECTOR_THREADS)
  53. .executorService(threadPool)
  54. .transportFactory(transportFactory)
  55. .inputProtocolFactory(THRIFT_PROTOCOL_FACTORY)
  56. .outputProtocolFactory(THRIFT_PROTOCOL_FACTORY)
  57. .processor(processor);
  58. args.maxReadBufferBytes = THRIFT_MAX_READ_BUF_SIZE;
  59. return new TThreadedSelectorServer(args);
  60. }
  61. @Override
  62. public void run() {
  63. try {
  64. server = build();
  65. server.serve();
  66. } catch (Exception e) {
  67. e.printStackTrace();
  68. throw new RuntimeException("Start Thrift RPC Server Exception");
  69. }
  70. }
  71. public void stop() throws Exception {
  72. threadPool.shutdown();
  73. server.stop();
  74. }
  75. }

我们来解释一下:

  • build方法用于构造一个可供运行的Thrift RPC Server
    1. 构造非阻塞Socket,并设置监听端口、超时
    2. 构造非阻塞Transport
    3. 构造线程池,在这里我们的服务器模型是非阻塞线程池RPC服务器。
    4. 构造底层传输协议即TFramedTransport
    5. 构造ThriftServer,并设置前面构造的非阻塞Transport、线程池、协议TBinaryProtocol
  • 整个ThriftServerRunnable类是一个线程Runnablerun,run函数中构造RPC服务,并启动服务(servee)
  • stop服务提供停止服务的方法

下面我们来看一下自动配置ThriftServerConfiguration.java:

  1. package com.coder4.lmsia.thrift.server.configuration;
  2. import com.coder4.lmsia.thrift.server.ThriftServerRunnable;
  3. import org.apache.thrift.TProcessor;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.DisposableBean;
  7. import org.springframework.beans.factory.InitializingBean;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
  10. import org.springframework.context.annotation.Configuration;
  11. import java.util.concurrent.TimeUnit;
  12. /**
  13. * @author coder4
  14. */
  15. @Configuration
  16. @ConditionalOnBean(value = {TProcessor.class})
  17. public class ThriftServerConfiguration implements InitializingBean, DisposableBean {
  18. private Logger LOG = LoggerFactory.getLogger(ThriftServerConfiguration.class);
  19. private static final int GRACEFUL_SHOWDOWN_SEC = 3;
  20. @Autowired
  21. private TProcessor processor;
  22. private ThriftServerRunnable thriftServer;
  23. private Thread thread;
  24. @Override
  25. public void destroy() throws Exception {
  26. LOG.info("Wait for graceful shutdown on destroy(), {} seconds", GRACEFUL_SHOWDOWN_SEC);
  27. Thread.sleep(TimeUnit.SECONDS.toMillis(GRACEFUL_SHOWDOWN_SEC));
  28. LOG.info("Shutdown rpc server.");
  29. thriftServer.stop();
  30. thread.join();
  31. }
  32. @Override
  33. public void afterPropertiesSet() throws Exception {
  34. thriftServer = new ThriftServerRunnable(processor);
  35. thread = new Thread(thriftServer);
  36. thread.start();
  37. }
  38. }

这是我们编写的第一个自动配置,我们稍微详细的解释一下:

  • 启动条件: 仅当服务提供了TProcessor才启用,我们稍后会在lmsia-abc项目中看到,后者封装了RPC的桩入口,提供了TProcessor。
  • InitializingBean: 自动配置实现了InitializingBean,为什么要实现这个接口呢?当这个自动配置被初始化时,所有Autowired的属性被自动注入(即Processor),而前面ThriftServerRunnable中我么已经看到,只有拿到了TProcessor,才能启动RPC服务。因此,我们使用了InitializingBean,它自带了afterPropertiesSet这个回调,会在所有属性被注入完成后,调用这个回调函数。
    • 在这里,我们调用了ThriftServerRunnable实现了Thrift RPC服务器的启动。
  • DisposableBean: 除了InitializingBean,我们还实现了DisposableBean。看名字就可以知道,这是Spring为了服务关闭时清理资源而设计的接口。事实也是如此,当服务关闭时,会依次调用每个自动配置,如果实现了DisposableBean,则回调destroy函数。
    • 在这里,我们先让线程休眠3秒,然后才关闭Thrift RPC服务,这主要是为了Graceful Shutdown而设计的(“优雅关闭”),关于这一点,我们会在下一节会做详细讲解。

最后,我们的自动配置默认是无法被发现的,需要一个配置文件spring.factories:

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.coder4.lmsia.thrift.server.configuration.ThriftServerConfiguration

解读完lmsia-thrift-server后,我们看看如何将它整合进lmsia-abc项目中。

  1. 在lmsia-abc-server子项目中的build.gradle中加入:

    1. compile 'com.github.liheyuan:lmsia-thrift-server:0.0.1'
  2. 提供一个TProcessor,如前文所述,这是启用自动配置的必要条件,ThriftProcessorConfiguration:```javapackage com.coder4.lmsia.abc.server.configuration;

import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift;import com.coder4.lmsia.abc.server.thrift.ThriftServerHandler;import org.apache.thrift.TProcessor;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;

/**

  • @author coder4*/@Configuration@ConditionalOnProperty(name = “thriftServer.enabled”, matchIfMissing = true)public class ThriftProcessorConfiguration {

    @Bean(name = “thriftProcessor”) public TProcessor processor(ThriftServerHandler handler) {

    1. return new LmsiaAbcThrift.Processor(handler);

    }

}

  1. 我们简单解释下:
  2. * 这也是一个自动配置,仅当配置文件中thriftServer.enabled=true时才启用(不配置默认true)
  3. * 提供的TProcessor,需要依赖ThriftServerHandler,这个就是Thrift生成的桩函数,项目结构分析中已经提到过,这是RPC服务器的逻辑入口。
  4. 怎么样,使用了自动配置后,启动一个Thrift 服务器是不是非常简单?
  5. ## Spring Boot整合Thrift RPC客户端
  6. 只有服务端是不行的,还需要有客户端。
  7. 类似地,为了方便的生成客户端,我们把代码进行了整理和抽象,放到了[lmsia-thrift-client](https://github.com/liheyuan/lmsia-thrift-client)项目中。
  8. 首先看一下项目结构:
  9. ```shell
  10. ├── build.gradle
  11. ├── gradle
  12. │ └── wrapper
  13. │ ├── gradle-wrapper.jar
  14. │ └── gradle-wrapper.properties
  15. ├── gradlew
  16. ├── gradlew.bat
  17. ├── README.md
  18. ├── settings.gradle
  19. └── src
  20. ├── main
  21. │ ├── java
  22. │ │ └── com
  23. │ │ └── coder4
  24. │ │ └── lmsia
  25. │ │ └── thrift
  26. │ │ └── client
  27. │ │ ├── ThriftClient.java
  28. │ │ ├── AbstractThriftClient.java
  29. │ │ ├── EasyThriftClient.java
  30. │ │ ├── K8ServiceThriftClient.java
  31. │ │ ├── K8ServiceKey.java
  32. │ │ ├── builder
  33. │ │ │ ├── EasyThriftClientBuilder.java
  34. │ │ │ └── K8ServiceThriftClientBuilder.java
  35. │ │ ├── func
  36. │ │ │ ├── ThriftCallFunc.java
  37. │ │ │ └── ThriftExecFunc.java
  38. │ │ ├── pool
  39. │ │ │ ├── TTransportPoolFactory.java
  40. │ │ │ └── TTransportPool.java
  41. │ │ └── utils
  42. │ │ └── ThriftUrlStr.java
  43. │ └── resources
  44. └── test
  45. └── java
  46. └── LibraryTest.java

解释下项目结构:

  • gradle相关的与之前类似,不再赘述
  • ThriftClient相关,定义了Thrift的客户端
    1. ThriftClient 抽象了客户端的接口
    2. AbstractThriftClient 实现了除连接外的Thrift Client操作
    3. EasyThriftClient 使用IP和端口直连的Thrift Client
    4. K8ServiceThriftClient 使用Kubernetes服务名字(根据微服务自动发现一节中的介绍,服务名字实际也是Host)和端口的Thrift Client,并内置了连接池。
  • func 函数编程工具类
  • builder 方便快速构造上述两种Thrift Client
  • pool 客户端连接池

本小节主要对IP、端口直连的客户端即EasyThriftClient进行介绍。关于支持服务自动发现以及连接池功能的K8ServiceThriftClient,将在下一节进行介绍。

先看一下接口定义,ThriftClient:

  1. package com.coder4.lmsia.thrift.client;
  2. import com.coder4.lmsia.thrift.client.func.ThriftCallFunc;
  3. import com.coder4.lmsia.thrift.client.func.ThriftExecFunc;
  4. import org.apache.thrift.TServiceClient;
  5. import java.util.concurrent.Future;
  6. /**
  7. * @author coder4
  8. */
  9. public interface ThriftClient<TCLIENT extends TServiceClient> {
  10. /**
  11. * sync call with return value
  12. * @param tcall thrift rpc client call
  13. * @param <TRET> return type
  14. * @return
  15. */
  16. <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall);
  17. /**
  18. * sync call without return value
  19. * @param texec thrift rpc client
  20. */
  21. void exec(ThriftExecFunc<TCLIENT> texec);
  22. /**
  23. * async call with return value
  24. * @param tcall thrift rpc client call
  25. * @param <TRET>
  26. * @return
  27. */
  28. <TRET> Future<TRET> asyncCall(ThriftCallFunc<TCLIENT, TRET> tcall);
  29. /**
  30. * asnyc call without return value
  31. * @param texec thrift rpc client call
  32. */
  33. <TRET> Future<?> asyncExec(ThriftExecFunc<TCLIENT> texec);
  34. }

这里需要解释一下,上述实际分成了两大类:

  • exec 无返回值的rpc调用
  • call 有返回值的调用

这里使用了Java 8的函数式编程进行抽象。如果不太熟悉的朋友,可以自行查阅相关资料。

在函数式编程的帮助下,我们可以将每一个rpc调用都分为同步和异步两种,异步的调用会返回一个Future。

再来看一下AbstractThriftClient:

  1. /**
  2. * @(#)AbstractThriftClient.java, Aug 01, 2017.
  3. * <p>
  4. * Copyright 2017 fenbi.com. All rights reserved.
  5. * FENBI.COM PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.coder4.lmsia.thrift.client;
  8. import com.coder4.lmsia.thrift.client.func.ThriftCallFunc;
  9. import com.coder4.lmsia.thrift.client.func.ThriftExecFunc;
  10. import org.apache.thrift.TServiceClient;
  11. import org.apache.thrift.TServiceClientFactory;
  12. import org.apache.thrift.protocol.TBinaryProtocol;
  13. import org.apache.thrift.protocol.TProtocol;
  14. import org.apache.thrift.transport.TTransport;
  15. import java.util.concurrent.ExecutorService;
  16. import java.util.concurrent.Future;
  17. import java.util.concurrent.LinkedBlockingDeque;
  18. import java.util.concurrent.ThreadPoolExecutor;
  19. import java.util.concurrent.TimeUnit;
  20. /**
  21. * @author coder4
  22. */
  23. public abstract class AbstractThriftClient<TCLIENT extends TServiceClient> implements ThriftClient<TCLIENT> {
  24. protected static final int THRIFT_CLIENT_DEFAULT_TIMEOUT = 5000;
  25. protected static final int THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 16;
  26. private Class<?> thriftClass;
  27. private static final TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
  28. private TServiceClientFactory<TCLIENT> clientFactory;
  29. // For async call
  30. private ExecutorService threadPool;
  31. public void init() {
  32. try {
  33. clientFactory = getThriftClientFactoryClass().newInstance();
  34. } catch (Exception e) {
  35. throw new RuntimeException();
  36. }
  37. if (!check()) {
  38. throw new RuntimeException("Client config failed check!");
  39. }
  40. threadPool = new ThreadPoolExecutor(
  41. 10, 100, 0,
  42. TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>());
  43. }
  44. protected boolean check() {
  45. if (thriftClass == null) {
  46. return false;
  47. }
  48. return true;
  49. }
  50. @Override
  51. public <TRET> Future<TRET> asyncCall(ThriftCallFunc<TCLIENT, TRET> tcall) {
  52. return threadPool.submit(() -> this.call(tcall));
  53. }
  54. @Override
  55. public <TRET> Future<?> asyncExec(ThriftExecFunc<TCLIENT> texec) {
  56. return threadPool.submit(() -> this.exec(texec));
  57. }
  58. protected TCLIENT createClient(TTransport transport) throws Exception {
  59. // Step 1: get TProtocol
  60. TProtocol protocol = protocolFactory.getProtocol(transport);
  61. // Step 2: get client
  62. return clientFactory.getClient(protocol);
  63. }
  64. private Class<TServiceClientFactory<TCLIENT>> getThriftClientFactoryClass() {
  65. Class<TCLIENT> clientClazz = getThriftClientClass();
  66. if (clientClazz == null) {
  67. return null;
  68. }
  69. for (Class<?> clazz : clientClazz.getDeclaredClasses()) {
  70. if (TServiceClientFactory.class.isAssignableFrom(clazz)) {
  71. return (Class<TServiceClientFactory<TCLIENT>>) clazz;
  72. }
  73. }
  74. return null;
  75. }
  76. private Class<TCLIENT> getThriftClientClass() {
  77. for (Class<?> clazz : thriftClass.getDeclaredClasses()) {
  78. if (TServiceClient.class.isAssignableFrom(clazz)) {
  79. return (Class<TCLIENT>) clazz;
  80. }
  81. }
  82. return null;
  83. }
  84. public void setThriftClass(Class<?> thriftClass) {
  85. this.thriftClass = thriftClass;
  86. }
  87. }

上述抽象的Thrift客户端实现了如下功能:

  1. 客户端线程池,这里主要是为异步调用准备的,与之前构造的服务端的线程池是完全不同的。
    • asyncCall和asyncExec使用了线程池来完成异步调用
  2. thriftClass 存储了Thrift的桩代码了类,不同业务生成的ThriftClass不一样,所以这里存储了class。
  3. createClient提供了共用函数,传入一个transport,即可构造生成一个Thrift Client,特别注意的是,这里设定的通信协议为TBinaryProtocol,必须与服务端保持一致,否则无法成功通信。

由于call和exec与连接实现较为相关,因此并未在这一层中实现,最后我们来看一下EasyThriftClient:

  1. package com.coder4.lmsia.thrift.client;
  2. import com.coder4.lmsia.thrift.client.func.ThriftCallFunc;
  3. import com.coder4.lmsia.thrift.client.func.ThriftExecFunc;
  4. import org.apache.thrift.TServiceClient;
  5. import org.apache.thrift.transport.TFramedTransport;
  6. import org.apache.thrift.transport.TSocket;
  7. import org.apache.thrift.transport.TTransport;
  8. /**
  9. * @author coder4
  10. */
  11. public class EasyThriftClient<TCLIENT extends TServiceClient> extends AbstractThriftClient<TCLIENT> {
  12. private static final int EASY_THRIFT_BUFFER_SIZE = 1024 * 16;
  13. protected String thriftServerHost;
  14. protected int thriftServerPort;
  15. @Override
  16. protected boolean check() {
  17. if (thriftServerHost == null || thriftServerHost.isEmpty()) {
  18. return false;
  19. }
  20. if (thriftServerPort <= 0) {
  21. return false;
  22. }
  23. return super.check();
  24. }
  25. private TTransport borrowTransport() throws Exception {
  26. TSocket socket = new TSocket(thriftServerHost, thriftServerPort, THRIFT_CLIENT_DEFAULT_TIMEOUT);
  27. TTransport transport = new TFramedTransport(
  28. socket, THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE);
  29. transport.open();
  30. return transport;
  31. }
  32. private void returnTransport(TTransport transport) {
  33. if (transport != null && transport.isOpen()) {
  34. transport.close();
  35. }
  36. }
  37. private void returnBrokenTransport(TTransport transport) {
  38. if (transport != null && transport.isOpen()) {
  39. transport.close();
  40. }
  41. }
  42. @Override
  43. public <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall) {
  44. // Step 1: get TTransport
  45. TTransport tpt = null;
  46. try {
  47. tpt = borrowTransport();
  48. } catch (Exception e) {
  49. throw new RuntimeException(e);
  50. }
  51. // Step 2: get client & call
  52. try {
  53. TCLIENT tcli = createClient(tpt);
  54. TRET ret = tcall.call(tcli);
  55. returnTransport(tpt);
  56. return ret;
  57. } catch (Exception e) {
  58. returnBrokenTransport(tpt);
  59. throw new RuntimeException(e);
  60. }
  61. }
  62. @Override
  63. public void exec(ThriftExecFunc<TCLIENT> texec) {
  64. // Step 1: get TTransport
  65. TTransport tpt = null;
  66. try {
  67. tpt = borrowTransport();
  68. } catch (Exception e) {
  69. throw new RuntimeException(e);
  70. }
  71. // Step 2: get client & exec
  72. try {
  73. TCLIENT tcli = createClient(tpt);
  74. texec.exec(tcli);
  75. returnTransport(tpt);
  76. } catch (Exception e) {
  77. returnBrokenTransport(tpt);
  78. throw new RuntimeException(e);
  79. }
  80. }
  81. public String getThriftServerHost() {
  82. return thriftServerHost;
  83. }
  84. public void setThriftServerHost(String thriftServerHost) {
  85. this.thriftServerHost = thriftServerHost;
  86. }
  87. public int getThriftServerPort() {
  88. return thriftServerPort;
  89. }
  90. public void setThriftServerPort(int thriftServerPort) {
  91. this.thriftServerPort = thriftServerPort;
  92. }

简单解释下上述代码

  1. 需要外部传入RPC服务器的主机名和端口 thriftServerHost和thriftServerPort
  2. borrowTransport完成Transport(Thrift中类似Socket的抽象) 的构造,注意这里要使用TFramedTransport,与之前服务端的构造保持一致。
  3. returnTransport关闭Transport
  4. returnBrokenTransport关闭出异常的Transport
  5. call和exec 在拿到Transport后,使用函数式编程的方式,完成rpc调用,如果有异常则关闭连接。

最后我们来看一下对应的Builder,EasyThriftClientBuilder:

  1. package com.coder4.lmsia.thrift.client.builder;
  2. import com.coder4.lmsia.thrift.client.EasyThriftClient;
  3. import org.apache.thrift.TServiceClient;
  4. /**
  5. * @author coder4
  6. */
  7. public class EasyThriftClientBuilder<TCLIENT extends TServiceClient> {
  8. private final EasyThriftClient<TCLIENT> client = new EasyThriftClient<>();
  9. protected EasyThriftClient<TCLIENT> build() {
  10. client.init();
  11. return client;
  12. }
  13. protected EasyThriftClientBuilder<TCLIENT> setHost(String host) {
  14. client.setThriftServerHost(host);
  15. return this;
  16. }
  17. protected EasyThriftClientBuilder<TCLIENT> setPort(int port) {
  18. client.setThriftServerPort(port);
  19. return this;
  20. }
  21. protected EasyThriftClientBuilder<TCLIENT> setThriftClass(Class<?> thriftClass) {
  22. client.setThriftClass(thriftClass);
  23. return this;
  24. }
  25. }

Builder的代码比较简单,就是以链式调用的方式,通过主机和端口,方便地构造一个EasyThriftClient。

看了EasyThriftClient后下面我们来看一下如何集成到项目中。

Gradle子项目划分与微服务的代码结构一节中,我们已经提到,将每个微服务的RPC客户端放在xx-client子工程中,现在我们再来回顾下lmsia-abc-client的目录结构。

  1. ├── build.gradle
  2. └── src
  3. ├── main
  4. ├── java
  5. └── com
  6. └── coder4
  7. └── lmsia
  8. └── abc
  9. └── client
  10. ├── configuration
  11. └── LmsiaAbcThriftClientConfiguration.java
  12. ├── LmsiaAbcEasyThriftClientBuilder.java
  13. └── LmsiaK8ServiceThriftClientBuilder.java
  14. └── resources
  15. └── META-INF
  16. └── spring.factories
  17. └── test

我们简单介绍一下:

  1. LmsiaAbcThriftClientConfiguration: 客户端自动配置,当激活时,自动生成lmsia-abc对应的RPC服务的客户端。引用者直接@Autowired一下,就可以使用了。
  2. LmsiaAbcEasyThriftClientBuilder: EasyThriftClient构造器,主要是自动配置需要。
  3. spring.factories: 与服务端的自动配置类似,需要在这个文件中指定自动配置的类路径,才能让Spring Boot自动扫描到自动配置。
  4. 其他K8ServiceThriftClient相关的部分,我们将在下一小节进行介绍。

LmsiaAbcEasyThriftClientBuilder文件:

  1. package com.coder4.lmsia.abc.client;
  2. import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift;
  3. import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client;
  4. import com.coder4.lmsia.thrift.client.ThriftClient;
  5. import com.coder4.lmsia.thrift.client.builder.EasyThriftClientBuilder;
  6. /**
  7. * @author coder4
  8. */
  9. public class LmsiaAbcEasyThriftClientBuilder extends EasyThriftClientBuilder<Client> {
  10. public LmsiaAbcEasyThriftClientBuilder(String host, int port) {
  11. setThriftClass(LmsiaAbcThrift.class);
  12. setHost(host);
  13. setPort(port);
  14. }
  15. public static ThriftClient<Client> buildClient(String host, int port) {
  16. return new LmsiaAbcEasyThriftClientBuilder(host, port).build();
  17. }
  18. }

上述Builder完成了实际的参数填充,主要有:

  1. ThriftClient的桩代码类设置(LmsiaAbcThrift.class)
  2. 设置主机名和端口

LmsiaAbcClientConfiguration文件:

  1. package com.coder4.lmsia.abc.client.configuration;
  2. import com.coder4.lmsia.abc.client.LmsiaAbcEasyThriftClientBuilder;
  3. import com.coder4.lmsia.abc.client.LmsiaK8ServiceClientBuilder;
  4. import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift;
  5. import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client;
  6. import com.coder4.lmsia.thrift.client.K8ServiceKey;
  7. import com.coder4.lmsia.thrift.client.ThriftClient;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.beans.factory.annotation.Qualifier;
  11. import org.springframework.beans.factory.annotation.Value;
  12. import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
  13. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  14. import org.springframework.context.annotation.Bean;
  15. import org.springframework.context.annotation.Configuration;
  16. import org.springframework.context.annotation.Profile;
  17. @Configuration
  18. public class LmsiaAbcThriftClientConfiguration {
  19. private Logger LOG = LoggerFactory.getLogger(getClass());
  20. @Bean(name = "lmsiaAbcThriftClient")
  21. @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient")
  22. @ConditionalOnProperty(name = {"lmsiaAbcThriftServer.host", "lmsiaAbcThriftServer.port"})
  23. public ThriftClient<Client> easyThriftClient(
  24. @Value("${lmsiaAbcThriftServer.host}") String host,
  25. @Value("${lmsiaAbcThriftServer.port}") int port
  26. ) {
  27. LOG.info("######## LmsiaAbcClientConfiguration ########");
  28. LOG.info("easyClient host = {}, port = {}", host, port);
  29. return LmsiaAbcEasyThriftClientBuilder.buildClient(host, port);
  30. }
  31. }

如上所示,满足两个条件时,会自动构造LmsiaAbcEasyThriftClient:

  1. 还没有生成其他的LmsiaAbcEasyThriftClient(ConditionalOnMissingBean)
  2. 配置中指定了lmsiaAbcThriftServer.host和lmsiaAbcThriftServer.port

根据我们前面的介绍,大家应该能理解,虽然有自动配置,但上述配置是一种很糟糕的方式。试想一下,如果我们的服务依赖了5个其他RPC服务,那么岂不是要分别配置5组IP和端口?此外,这种方式也无法支持节点的负载均衡。

如何解决这个问题呢?我们将在K8ServiceThriftClient中解决。

本小节的最后,我们看一下spring.factories:

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.coder4.lmsia.abc.client.configuration.LmsiaAbcThriftClientConfiguration

和之前lmsia-abc-server子工程中的文件类似,这里设置了自动配置的详细类路径,方便Spring Boot的自动扫描。

K8ServiceThriftClient

在对EasyThriftClient的介绍中,我们发现了一个问题,需要单独配置IP和端口,不支持服务自动发现。

此外,在这个客户端的实现中,默认每次都要建立新的连接。而对于后端服务而言,RPC的服务端和客户端多数都是在内网环境中,连接情况比较稳定,可以通过连接池的方式减少连接握手开销,从而提升RPC服务的性能。如果你对连接池的原理还不太熟悉,可以参考百科连接池

为此,我们本将介绍K8ServiceThriftClient,它很好的解决了上述问题。

首先,我们使用commons-pool2来构建了TTransport层的连接池。

TTransportPoolFactory:

  1. package com.coder4.lmsia.thrift.client.pool;
  2. import com.coder4.lmsia.thrift.client.K8ServiceKey;
  3. import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
  4. import org.apache.commons.pool2.PooledObject;
  5. import org.apache.commons.pool2.impl.DefaultPooledObject;
  6. import org.apache.thrift.transport.TFramedTransport;
  7. import org.apache.thrift.transport.TSocket;
  8. import org.apache.thrift.transport.TTransport;
  9. /**
  10. * @author coder4
  11. */
  12. public class TTransportPoolFactory extends BaseKeyedPooledObjectFactory<K8ServiceKey, TTransport> {
  13. protected static final int THRIFT_CLIENT_DEFAULT_TIMEOUT = 5000;
  14. protected static final int THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 16;
  15. @Override
  16. public TTransport create(K8ServiceKey key) throws Exception {
  17. if (key != null) {
  18. String host = key.getK8ServiceHost();
  19. int port = key.getK8ServicePort();
  20. TSocket socket = new TSocket(host, port, THRIFT_CLIENT_DEFAULT_TIMEOUT);
  21. TTransport transport = new TFramedTransport(
  22. socket, THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE);
  23. transport.open();
  24. return transport;
  25. } else {
  26. return null;
  27. }
  28. }
  29. @Override
  30. public PooledObject<TTransport> wrap(TTransport transport) {
  31. return new DefaultPooledObject<>(transport);
  32. }
  33. @Override
  34. public void destroyObject(K8ServiceKey key, PooledObject<TTransport> obj) throws Exception {
  35. obj.getObject().close();
  36. }
  37. @Override
  38. public boolean validateObject(K8ServiceKey key, PooledObject<TTransport> obj) {
  39. return obj.getObject().isOpen();
  40. }
  41. }

上述代码主要完成以下功能:

  1. 连接超时配置(5秒)
  2. create, 生成新连接(TTransport),这里与之前的EasyThriftClient非常类似,不再赘述
  3. 验证连接是否有效,通过TTransport的isOpen判断。

TTransportPool:

  1. package com.coder4.lmsia.thrift.client.pool;
  2. import com.coder4.lmsia.thrift.client.K8ServiceKey;
  3. import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
  4. import org.apache.thrift.transport.TTransport;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. /**
  8. * @author coder4
  9. */
  10. public class TTransportPool extends GenericKeyedObjectPool<K8ServiceKey, TTransport> {
  11. private Logger LOG = LoggerFactory.getLogger(getClass());
  12. private static int MAX_CONN = 1024;
  13. private static int MIN_IDLE_CONN = 8;
  14. private static int MAX_IDLE_CONN = 32;
  15. public TTransportPool(TTransportPoolFactory factory) {
  16. super(factory);
  17. setTimeBetweenEvictionRunsMillis(45 * 1000);
  18. setNumTestsPerEvictionRun(5);
  19. setMaxWaitMillis(30 * 1000);
  20. setMaxTotal(MAX_CONN);
  21. setMaxTotalPerKey(MAX_CONN);
  22. setMinIdlePerKey(MIN_IDLE_CONN);
  23. setMaxTotalPerKey(MAX_IDLE_CONN);
  24. setTestOnCreate(true);
  25. setTestOnBorrow(true);
  26. setTestWhileIdle(true);
  27. }
  28. @Override
  29. public TTransportPoolFactory getFactory() {
  30. return (TTransportPoolFactory) super.getFactory();
  31. }
  32. public void returnBrokenObject(K8ServiceKey key, TTransport transport) {
  33. try {
  34. invalidateObject(key, transport);
  35. } catch (Exception e) {
  36. LOG.warn("return broken key " + key);
  37. e.printStackTrace();
  38. }
  39. }
  40. }

上述代码主要是完成连接池的配置,比较直观:

  1. 设置最大连接数1024
  2. 设置最大空闲数32,最小空闲数8,每间隔45秒尝试更改维护连接池中的连接数量。
  3. 当每次”创建”、从池子中”借用”、”空闲”时,检查连接是否有效。

下面我们来看一下如何在K8ServiceThriftClient中使用:

  1. package com.coder4.lmsia.thrift.client;
  2. import com.coder4.lmsia.thrift.client.func.ThriftCallFunc;
  3. import com.coder4.lmsia.thrift.client.func.ThriftExecFunc;
  4. import com.coder4.lmsia.thrift.client.pool.TTransportPool;
  5. import com.coder4.lmsia.thrift.client.pool.TTransportPoolFactory;
  6. import org.apache.thrift.TServiceClient;
  7. import org.apache.thrift.transport.TTransport;
  8. public class K8ServiceThriftClient<TCLIENT extends TServiceClient>
  9. extends AbstractThriftClient<TCLIENT> {
  10. private K8ServiceKey k8ServiceKey;
  11. private TTransportPool connPool;
  12. @Override
  13. public void init() {
  14. super.init();
  15. // check
  16. if (k8ServiceKey == null) {
  17. throw new RuntimeException("invalid k8ServiceName or k8Serviceport");
  18. }
  19. // init pool
  20. connPool = new TTransportPool(new TTransportPoolFactory());
  21. }
  22. @Override
  23. public <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall) {
  24. // Step 1: get TTransport
  25. TTransport tpt = null;
  26. K8ServiceKey key = getConnBorrowKey();
  27. try {
  28. tpt = connPool.borrowObject(key);
  29. } catch (Exception e) {
  30. throw new RuntimeException(e);
  31. }
  32. // Step 2: get client & call
  33. try {
  34. TCLIENT tcli = createClient(tpt);
  35. TRET ret = tcall.call(tcli);
  36. returnTransport(key, tpt);
  37. return ret;
  38. } catch (Exception e) {
  39. returnBrokenTransport(key, tpt);
  40. throw new RuntimeException(e);
  41. }
  42. }
  43. @Override
  44. public void exec(ThriftExecFunc<TCLIENT> texec) {
  45. // Step 1: get TTransport
  46. TTransport tpt = null;
  47. K8ServiceKey key = getConnBorrowKey();
  48. try {
  49. // borrow transport
  50. tpt = connPool.borrowObject(key);
  51. } catch (Exception e) {
  52. throw new RuntimeException(e);
  53. }
  54. // Step 2: get client & exec
  55. try {
  56. TCLIENT tcli = createClient(tpt);
  57. texec.exec(tcli);
  58. returnTransport(key, tpt);
  59. } catch (Exception e) {
  60. returnBrokenTransport(key, tpt);
  61. throw new RuntimeException(e);
  62. }
  63. }
  64. private K8ServiceKey getConnBorrowKey() {
  65. return k8ServiceKey;
  66. }
  67. private void returnTransport(K8ServiceKey key, TTransport transport) {
  68. connPool.returnObject(key, transport);
  69. }
  70. private void returnBrokenTransport(K8ServiceKey key, TTransport transport) {
  71. connPool.returnBrokenObject(key, transport);
  72. }
  73. public K8ServiceKey getK8ServiceKey() {
  74. return k8ServiceKey;
  75. }
  76. public void setK8ServiceKey(K8ServiceKey k8ServiceKey) {
  77. this.k8ServiceKey = k8ServiceKey;
  78. }
  79. }

上述大部分代码和EasyThriftClient非常接近,有差异的部分主要是与连接的”借用”、”归还”相关的:

  1. 在call和exec中,借用连接
    • getConnBorrowKey先构造一个key,包含了主机名和端口。这里的主机名是微服务的自动发现中提到的Kubernetes服务,如果你对相关原理不太熟悉,可以自行回顾对应章节。
    • 从connPool中借用一个连接(TTransport)
    • 剩余发起rpc调用的步骤就和EasyThriftClient相同了,不再赘述。
  2. 当rpc调用结束后
    • 正常结束,调用connPool.returnObject将TTransport归还到连接池中。
    • 非正常结束,调用connPool.returnBrokenTransport,让连接池销毁这个连接,以防后续借用到这个可能出错的TTransport。

类似的,我们也配套了对应的Builder:

  1. package com.coder4.lmsia.thrift.client.builder;
  2. import com.coder4.lmsia.thrift.client.EasyThriftClient;
  3. import org.apache.thrift.TServiceClient;
  4. /**
  5. * @author coder4
  6. */
  7. public class EasyThriftClientBuilder<TCLIENT extends TServiceClient> {
  8. private final EasyThriftClient<TCLIENT> client = new EasyThriftClient<>();
  9. protected EasyThriftClient<TCLIENT> build() {
  10. client.init();
  11. return client;
  12. }
  13. protected EasyThriftClientBuilder<TCLIENT> setHost(String host) {
  14. client.setThriftServerHost(host);
  15. return this;
  16. }
  17. protected EasyThriftClientBuilder<TCLIENT> setPort(int port) {
  18. client.setThriftServerPort(port);
  19. return this;
  20. }
  21. protected EasyThriftClientBuilder<TCLIENT> setThriftClass(Class<?> thriftClass) {
  22. client.setThriftClass(thriftClass);
  23. return this;
  24. }
  25. }

上述Builder主要是设置所需的两个参数,Host和Port,看起来和EasyThriftClient并没有什么不同?

别着急,我们继续看一下lmsia-abc-client中的集成:

  1. package com.coder4.lmsia.abc.client;
  2. import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift;
  3. import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client;
  4. import com.coder4.lmsia.thrift.client.K8ServiceKey;
  5. import com.coder4.lmsia.thrift.client.ThriftClient;
  6. import com.coder4.lmsia.thrift.client.builder.K8ServiceThriftClientBuilder;
  7. /**
  8. * @author coder4
  9. */
  10. public class LmsiaK8ServiceThriftClientBuilder extends K8ServiceThriftClientBuilder<Client> {
  11. public LmsiaK8ServiceThriftClientBuilder(K8ServiceKey k8ServiceKey) {
  12. setThriftClass(LmsiaAbcThrift.class);
  13. setK8ServiceKey(k8ServiceKey);
  14. }
  15. public static ThriftClient<Client> buildClient(K8ServiceKey k8ServiceKey) {
  16. return new LmsiaK8ServiceThriftClientBuilder(k8ServiceKey).build();
  17. }
  18. }

在集成的时候,我们需要传入一个key,可以手动制定,也可以自动配置

我们看一下完整的自动配置代码,LmsiaAbcThriftClientConfiguration:

  1. public class LmsiaAbcThriftClientConfiguration {
  2. @Bean(name = "lmsiaAbcThriftClient")
  3. @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient")
  4. @ConditionalOnProperty(name = {"lmsiaAbcThriftServer.host", "lmsiaAbcThriftServer.port"})
  5. public ThriftClient<Client> easyThriftClient(
  6. @Value("${lmsiaAbcThriftServer.host}") String host,
  7. @Value("${lmsiaAbcThriftServer.port}") int port
  8. ) {
  9. LOG.info("######## LmsiaAbcThriftClientConfiguration ########");
  10. LOG.info("easyThriftClient host = {}, port = {}", host, port);
  11. return LmsiaAbcEasyThriftClientBuilder.buildClient(host, port);
  12. }
  13. @Bean(name = "lmsiaAbcThriftClient")
  14. @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient")
  15. public ThriftClient<LmsiaAbcThrift.Client> k8ServiceThriftClient() {
  16. LOG.info("######## LmsiaAbcThriftClientConfiguration ########");
  17. K8ServiceKey k8ServiceKey = new K8ServiceKey(K8_SERVICE_NAME, K8_SERVICE_PORT);
  18. LOG.info("k8ServiceThriftClient key:" + k8ServiceKey);
  19. return LmsiaK8ServiceThriftClientBuilder.buildClient(k8ServiceKey);
  20. }
  21. //...
  22. }

对比easyThriftClient和k8ServiceThriftClient不难发现,K8ServiceThriftClient的参数,是通过常量直接写死的。也就是我们在微服务的自动发现与负载均衡中提到的,约定好服务的命名规则。

看下常量定义:

  1. public class LmsiaAbcConstant {
  2. // ......
  3. public static final String PROJECT_NAME = "lmsia-abc";
  4. public static final String K8_SERVICE_NAME = PROJECT_NAME + "-server";
  5. public static final int K8_SERVICE_PORT = 3000;
  6. // ......
  7. }

这样以来,一旦确定了项目名,那么Kubernetes中的服务名字也确定了。因此,k8ServiceThriftClient自动配置会被自动激活,即只要引用了lmsia-abc-client这个包,就会自动配置好一个RPC客户端,是不是非常方便?

我们来看一下具体的使用例子:

  1. import com.coder4.lmsia.thrift.client.ThriftClient;
  2. public class LmsiaAbctProxy {
  3. @Autowired
  4. private ThriftClient<Client> client;
  5. public String hello() {
  6. return client.call(cli -> cli.sayHi());
  7. }

至此,我们已经完成了在Spring Boo中集成Thrift RPC的服务端、客户端的工作。

  • 服务端,我们通过ThriftServerConfiguration、ThriftProcessorConfiguration自动配置了Thrift RPC服务端。
  • 客户端,通过Kubernetes的服务功能,自动配置了带服务发现功能的Thrift RPC客户端K8ServiceThriftClient。该客户端同时内置了连接池,用于节省连接开销。