设计推送系统


推送系统负责将公开市场的实时信息,包括订单簿、最新成交、最新K线等推送给客户端,对于用户的订单,还需要将成交信息推送给指定用户。FIX(Financial Information eXchange)协议是金融交易的一种实时化通讯协议,但是它非常复杂,而且不同版本的规范也不同。对于Warp Exchange来说,我们先实现一版简单的基于WebSocket推送JSON格式的通知。

和普通Web应用不同的是,基于Servlet的线程池模型不能高效地支持成百上千的WebSocket长连接。Java提供了NIO能充分利用Linux系统的epoll机制高效支持大量的长连接,但是直接使用NIO的接口非常繁琐,通常我们会选择基于NIO的Netty服务器。直接使用Netty其实仍然比较繁琐,基于Netty开发我们可以选择:

  • Spring WebFlux:封装了Netty并实现Reactive接口;
  • Vert.x:封装了Netty并提供简单的API接口。

这里我们选择Vert.x,因为它的API更简单。

Vert.x本身包含若干模块,根据需要,我们引入3个组件:

  1. <dependency>
  2. <groupId>io.vertx</groupId>
  3. <artifactId>vertx-core</artifactId>
  4. <version>${vertx.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>io.vertx</groupId>
  8. <artifactId>vertx-web</artifactId>
  9. <version>${vertx.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>io.vertx</groupId>
  13. <artifactId>vertx-redis-client</artifactId>
  14. <version>${vertx.version}</version>
  15. </dependency>

我们先编写推送服务的入口:

  1. package com.itranswarp.exchange.push;
  2. @SpringBootApplication
  3. // 禁用数据库自动配置 (无DataSource, JdbcTemplate...)
  4. @EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class)
  5. public class PushApplication {
  6. public static void main(String[] args) {
  7. System.setProperty("vertx.disableFileCPResolving", "true");
  8. System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory");
  9. SpringApplication app = new SpringApplication(PushApplication.class);
  10. // 禁用Spring的Web:
  11. app.setWebApplicationType(WebApplicationType.NONE);
  12. app.run(args);
  13. }
  14. }

上述代码仍然是一个标准的Spring Boot应用,因为我们希望利用Spring Cloud Config读取配置。由于我们不使用Spring自身的Web功能,因此需要禁用Spring的Web功能。推送服务本身并不需要访问数据库,因此禁用数据库自动配置。最后,我们把PushApplication放在com.itranswarp.exchange.push包下面,以避免自动扫描到com.itranswarp.exchange包下的组件(如RedisService)。

下一步是编写PushService,注意它是一个Spring组件,由Spring初始化:

  1. @Component
  2. public class PushService extends LoggerSupport {
  3. @Value("${server.port}")
  4. private int serverPort;
  5. @Value("${exchange.config.hmac-key}")
  6. String hmacKey;
  7. @Value("${spring.redis.standalone.host:localhost}")
  8. private String redisHost;
  9. @Value("${spring.redis.standalone.port:6379}")
  10. private int redisPort;
  11. @Value("${spring.redis.standalone.password:}")
  12. private String redisPassword;
  13. @Value("${spring.redis.standalone.database:0}")
  14. private int redisDatabase = 0;
  15. private Vertx vertx;
  16. @PostConstruct
  17. public void startVertx() {
  18. // TODO: init Vert.x
  19. }
  20. }

由Spring初始化该组件的目的是注入各种配置。在初始化方法中,我们就可以启动Vert.x:

  1. @PostConstruct
  2. public void startVertx() {
  3. // 启动Vert.x:
  4. this.vertx = Vertx.vertx();
  5. // 创建一个Vert.x Verticle组件:
  6. var push = new PushVerticle(this.hmacKey, this.serverPort);
  7. vertx.deployVerticle(push);
  8. // 连接到Redis:
  9. String url = "redis://" + (this.redisPassword.isEmpty() ? "" : ":" + this.redisPassword + "@") + this.redisHost
  10. + ":" + this.redisPort + "/" + this.redisDatabase;
  11. Redis redis = Redis.createClient(vertx, url);
  12. redis.connect().onSuccess(conn -> {
  13. // 事件处理:
  14. conn.handler(response -> {
  15. // 收到Redis的PUSH:
  16. if (response.type() == ResponseType.PUSH) {
  17. int size = response.size();
  18. if (size == 3) {
  19. Response type = response.get(2);
  20. if (type instanceof BulkType) {
  21. // 收到PUBLISH通知:
  22. String msg = type.toString();
  23. // 由push verticle组件处理该通知:
  24. push.broadcast(msg);
  25. }
  26. }
  27. }
  28. });
  29. // 订阅Redis的Topic:
  30. conn.send(Request.cmd(Command.SUBSCRIBE).arg(RedisCache.Topic.NOTIFICATION)).onSuccess(resp -> {
  31. logger.info("subscribe ok.");
  32. }).onFailure(err -> {
  33. logger.error("subscribe failed.", err);
  34. System.exit(1);
  35. });
  36. }).onFailure(err -> {
  37. logger.error("connect to redis failed.", err);
  38. System.exit(1);
  39. });
  40. }

Vert.x用Verticle表示一个组件,我们编写PushVerticle来处理WebSocket连接:

  1. public class PushVerticle extends AbstractVerticle {
  2. @Override
  3. public void start() {
  4. // 创建VertX HttpServer:
  5. HttpServer server = vertx.createHttpServer();
  6. // 创建路由:
  7. Router router = Router.router(vertx);
  8. // 处理请求 GET /notification:
  9. router.get("/notification").handler(requestHandler -> {
  10. HttpServerRequest request = requestHandler.request();
  11. // 从token参数解析userId:
  12. Supplier<Long> supplier = () -> {
  13. String tokenStr = request.getParam("token");
  14. if (tokenStr != null && !tokenStr.isEmpty()) {
  15. AuthToken token = AuthToken.fromSecureString(tokenStr, this.hmacKey);
  16. if (!token.isExpired()) {
  17. return token.userId();
  18. }
  19. }
  20. return null;
  21. };
  22. final Long userId = supplier.get();
  23. logger.info("parse user id from token: {}", userId);
  24. // 将连接升级到WebSocket:
  25. request.toWebSocket(ar -> {
  26. if (ar.succeeded()) {
  27. initWebSocket(ar.result(), userId);
  28. }
  29. });
  30. });
  31. // 处理请求 GET /actuator/health:
  32. router.get("/actuator/health").respond(
  33. ctx -> ctx.response().putHeader("Content-Type", "application/json").end("{\"status\":\"UP\"}"));
  34. // 其他请求返回404错误:
  35. router.get().respond(ctx -> ctx.response().setStatusCode(404).setStatusMessage("No Route Found").end());
  36. // 绑定路由并监听端口:
  37. server.requestHandler(router).listen(this.serverPort, result -> {
  38. if (result.succeeded()) {
  39. logger.info("Vertx started on port(s): {} (http) with context path ''", this.serverPort);
  40. } else {
  41. logger.error("Start http server failed on port " + this.serverPort, result.cause());
  42. vertx.close();
  43. System.exit(1);
  44. }
  45. });
  46. }
  47. }

PushVerticle中,start()方法由Vert.x回调。我们在start()方法中主要干这么几件事:

  1. 创建基于Vert.x的HTTP服务器(内部使用Netty);
  2. 创建路由;
  3. 绑定一个路径为/notification的GET请求,将其升级为WebSocket连接;
  4. 绑定其他路径的GET请求;
  5. 开始监听指定端口号。

在处理/notification时,我们尝试从URL的token参数解析出用户ID,这样我们就无需访问数据库而获得了当前连接的用户。升级到WebSocket连接后,再调用initWebSocket()继续处理WebSocket连接:

  1. public class PushVerticle extends AbstractVerticle {
  2. // 所有Handler:
  3. Map<String, Boolean> handlersSet = new ConcurrentHashMap<>(1000);
  4. // 用户ID -> Handlers
  5. Map<Long, Set<String>> userToHandlersMap = new ConcurrentHashMap<>(1000);
  6. // Handler -> 用户ID
  7. Map<String, Long> handlerToUserMap = new ConcurrentHashMap<>(1000);
  8. void initWebSocket(ServerWebSocket websocket, Long userId) {
  9. // 获取一个WebSocket关联的Handler ID:
  10. String handlerId = websocket.textHandlerID();
  11. // 处理输入消息:
  12. websocket.textMessageHandler(str -> {
  13. logger.info("text message: " + str);
  14. });
  15. websocket.exceptionHandler(t -> {
  16. logger.error("websocket error: " + t.getMessage(), t);
  17. });
  18. // 关闭连接时:
  19. websocket.closeHandler(e -> {
  20. unsubscribeClient(handlerId);
  21. unsubscribeUser(handlerId, userId);
  22. });
  23. subscribeClient(handlerId);
  24. subscribeUser(handlerId, userId);
  25. }
  26. void subscribeClient(String handlerId) {
  27. this.handlersSet.put(handlerId, Boolean.TRUE);
  28. }
  29. void unsubscribeClient(String handlerId) {
  30. this.handlersSet.remove(handlerId);
  31. }
  32. void subscribeUser(String handlerId, Long userId) {
  33. if (userId == null) {
  34. return;
  35. }
  36. handlerToUserMap.put(handlerId, userId);
  37. Set<String> set = userToHandlersMap.get(userId);
  38. if (set == null) {
  39. set = new HashSet<>();
  40. userToHandlersMap.put(userId, set);
  41. }
  42. set.add(handlerId);
  43. }
  44. void unsubscribeUser(String handlerId, Long userId) {
  45. if (userId == null) {
  46. return;
  47. }
  48. handlerToUserMap.remove(handlerId);
  49. Set<String> set = userToHandlersMap.get(userId);
  50. if (set != null) {
  51. set.remove(handlerId);
  52. }
  53. }
  54. }

在Vert.x中,每个WebSocket连接都有一个唯一的Handler标识,以String表示。我们用几个Map保存Handler和用户ID的映射关系,当关闭连接时,将对应的映射关系删除。

最后一个关键方法broadcast()PushService中订阅的Redis推送时触发,该方法用于向用户主动推送通知:

  1. public void broadcast(String text) {
  2. NotificationMessage message = JsonUtil.readJson(text, NotificationMessage.class);
  3. if (message.userId == null) {
  4. // 没有用户ID时,推送给所有连接:
  5. EventBus eb = vertx.eventBus();
  6. for (String handler : this.handlersSet.keySet()) {
  7. eb.send(handler, text);
  8. }
  9. } else {
  10. // 推送给指定用户:
  11. Set<String> handlers = this.userToHandlersMap.get(message.userId);
  12. if (handlers != null) {
  13. EventBus eb = vertx.eventBus();
  14. for (String handler : handlers) {
  15. eb.send(handler, text);
  16. }
  17. }
  18. }
  19. }

当Redis收到PUBLISH调用后,它自动将String表示的JSON数据推送给所有订阅端。我们在PushService中订阅了notification这个Topic,然后通过broadcast()推送给WebSocket客户端。对于一个NotificationMessage,如果设置了userId,则推送给指定用户,适用于订单成交等针对用户ID的通知;如果没有设置userId,则推送给所有用户,适用于公开市场信息的推送。

整个推送服务仅包括3个Java文件,我们就实现了基于Redis和WebSocket的高性能推送。

参考源码

可以从GitHubGitee下载源码。

GitHubmichaelliaowarpexchange/

▸ build)

▸ sql)

▤ schema.sql)

▤ docker-compose.yml)

▤ pom.xml)

▸ common)

▸ src/main)

▸ java/com/itranswarp/exchange)

▸ bean)

▤ AuthToken.java)

▤ OrderBookBean.java)

▤ OrderBookItemBean.java)

▤ OrderRequestBean.java)

▤ SimpleMatchDetailRecord.java)

▤ TransferRequestBean.java)

▤ ValidatableBean.java)

▸ client)

▤ RestClient.java)

▸ config)

▤ ExchangeConfiguration.java)

▸ ctx)

▤ UserContext.java)

▸ db)

▤ AccessibleProperty.java)

▤ Criteria.java)

▤ CriteriaQuery.java)

▤ DbTemplate.java)

▤ From.java)

▤ Limit.java)

▤ Mapper.java)

▤ OrderBy.java)

▤ Select.java)

▤ Where.java)

▸ enums)

▤ AssetEnum.java)

▤ BarType.java)

▤ ClearingType.java)

▤ Direction.java)

▤ MatchType.java)

▤ OrderStatus.java)

▤ UserType.java)

▸ message)

▸ event)

▤ AbstractEvent.java)

▤ OrderCancelEvent.java)

▤ OrderRequestEvent.java)

▤ TransferEvent.java)

▤ AbstractMessage.java)

▤ ApiResultMessage.java)

▤ NotificationMessage.java)

▤ TickMessage.java)

▸ messaging)

▤ BatchMessageHandler.java)

▤ MessageConsumer.java)

▤ MessageProducer.java)

▤ MessageTypes.java)

▤ Messaging.java)

▤ MessagingConfiguration.java)

▤ MessagingFactory.java)

▸ model)

▸ quotation)

▤ DayBarEntity.java)

▤ HourBarEntity.java)

▤ MinBarEntity.java)

▤ SecBarEntity.java)

▤ TickEntity.java)

▸ support)

▤ AbstractBarEntity.java)

▤ EntitySupport.java)

▸ trade)

▤ ClearingEntity.java)

▤ EventEntity.java)

▤ MatchDetailEntity.java)

▤ OrderEntity.java)

▤ TransferLogEntity.java)

▤ UniqueEventEntity.java)

▸ ui)

▤ ApiKeyAuthEntity.java)

▤ PasswordAuthEntity.java)

▤ UserEntity.java)

▤ UserProfileEntity.java)

▸ redis)

▤ RedisCache.java)

▤ RedisConfiguration.java)

▤ RedisService.java)

▤ SyncCommandCallback.java)

▸ support)

▤ AbstractApiController.java)

▤ AbstractDbService.java)

▤ AbstractFilter.java)

▤ LoggerSupport.java)

▸ user)

▤ UserService.java)

▸ util)

▤ ByteUtil.java)

▤ ClassPathUtil.java)

▤ HashUtil.java)

▤ IdUtil.java)

▤ IpUtil.java)

▤ JsonUtil.java)

▤ RandomUtil.java)

▤ ApiError.java)

▤ ApiErrorResponse.java)

▤ ApiException.java)

▸ resources)

▸ redis)

▤ update-bar.lua)

▤ update-orderbook.lua)

▤ update-recent-ticks.lua)

▤ logback-spring.xml)

▤ pom.xml)

▸ config)

▸ src/main)

▸ java/com/itranswarp/exchange/config)

▤ ConfigApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ config-repo)

▤ application-default.yml)

▤ application-test.yml)

▤ application.yml)

▤ push.yml)

▤ quotation.yml)

▤ trading-api.yml)

▤ trading-engine.yml)

▤ trading-sequencer.yml)

▤ ui-default.yml)

▤ ui.yml)

▸ parent)

▤ pom.xml)

▸ push)

▸ src/main)

▸ java/com/itranswarp/exchange/push)

▤ PushApplication.java)

▤ PushService.java)

▤ PushVerticle.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ quotation)

▸ src/main)

▸ java/com/itranswarp/exchange)

▸ quotation)

▤ QuotationDbService.java)

▤ QuotationService.java)

▤ QuotationApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ trading-api)

▸ src/main)

▸ java/com/itranswarp/exchange)

▸ service)

▤ HistoryService.java)

▤ SendEventService.java)

▤ TradingEngineApiProxyService.java)

▸ web)

▸ api)

▤ TradingApiController.java)

▤ TradingInternalApiController.java)

▤ ApiFilterRegistrationBean.java)

▤ TradingApiApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ trading-engine)

▸ src)

▸ main)

▸ java/com/itranswarp/exchange)

▸ assets)

▤ Asset.java)

▤ AssetService.java)

▤ Transfer.java)

▸ clearing)

▤ ClearingService.java)

▸ match)

▤ MatchDetailRecord.java)

▤ MatchEngine.java)

▤ MatchResult.java)

▤ OrderBook.java)

▤ OrderKey.java)

▸ order)

▤ OrderService.java)

▸ store)

▤ StoreService.java)

▸ web/api)

▤ InternalTradingEngineApiController.java)

▤ TradingEngineApplication.java)

▤ TradingEngineService.java)

▸ resources)

▤ application.yml)

▸ test/java/com/itranswarp/exchange)

▸ assets)

▤ AssetServiceTest.java)

▸ match)

▤ MatchEngineTest.java)

▤ TradingEngineServiceTest.java)

▤ pom.xml)

▸ trading-sequencer)

▸ src/main)

▸ java/com/itranswarp/exchange)

▸ sequencer)

▤ SequenceHandler.java)

▤ SequenceService.java)

▤ TradingSequencerApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ ui)

▸ src/main)

▸ java/com/itranswarp/exchange)

▤ UIApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▤ .gitignore)

▤ LICENSE)

▤ README.md)

小结

要高效处理大量WebSocket连接,我们选择基于Netty的Vert.x框架,可以通过少量代码配合Redis实现推送。

读后有收获可以支付宝请作者喝咖啡:

设计推送系统 - 图1