完成交易引擎


我们现在实现了资产模块、订单模块、撮合引擎和清算模块,现在,就可以把它们组合起来,实现一个完整的交易引擎:

  1. public class TradingEngineService {
  2. @Autowired
  3. AssetService assetService;
  4. @Autowired
  5. OrderService orderService;
  6. @Autowired
  7. MatchEngine matchEngine;
  8. @Autowired
  9. ClearingService clearingService;
  10. }

交易引擎由事件驱动,因此,通过订阅Kafka的Topic实现批量读消息,然后依次处理每个事件:

  1. void processMessages(List<AbstractEvent> messages) {
  2. for (AbstractEvent message : messages) {
  3. processEvent(message);
  4. }
  5. }
  6. void processEvent(AbstractEvent event) {
  7. if (event instanceof OrderRequestEvent) {
  8. createOrder((OrderRequestEvent) event);
  9. } else if (event instanceof OrderCancelEvent) {
  10. cancelOrder((OrderCancelEvent) event);
  11. } else if (event instanceof TransferEvent) {
  12. transfer((TransferEvent) event);
  13. }
  14. }

我们目前一共有3种类型的事件,处理都非常简单。以createOrder()为例,核心代码其实就几行:

  1. void createOrder(OrderRequestEvent event) {
  2. // 生成Order ID:
  3. long orderId = event.sequenceId * 10000 + (year * 100 + month);
  4. // 创建Order:
  5. OrderEntity order = orderService.createOrder(event.sequenceId, event.createdAt, orderId, event.userId, event.direction, event.price, event.quantity);
  6. if (order == null) {
  7. logger.warn("create order failed.");
  8. return;
  9. }
  10. // 撮合:
  11. MatchResult result = matchEngine.processOrder(event.sequenceId, order);
  12. // 清算:
  13. clearingService.clearMatchResult(result);
  14. }

核心的业务逻辑并不复杂,只是交易引擎在处理完订单后,仅仅改变自身状态是不够的,它还得向外输出具体的成交信息、订单状态等。因此,需要根据业务需求,在清算后继续收集撮合结果、已完成订单、准备发送的通知等,通过消息系统或Redis向外输出交易信息。如果把这些功能放到同一个线程内同步完成是非常耗时的,更好的方法是把它们先存储起来,再异步处理。例如,对于已完成的订单,可以异步落库:

  1. Queue<List<OrderEntity>> orderQueue = new ConcurrentLinkedQueue<>();
  2. void createOrder(OrderRequestEvent event) {
  3. ...
  4. // 清算完成后,收集已完成Order:
  5. if (!result.matchDetails.isEmpty()) {
  6. List<OrderEntity> closedOrders = new ArrayList<>();
  7. if (result.takerOrder.status.isFinalStatus) {
  8. closedOrders.add(result.takerOrder);
  9. }
  10. for (MatchDetailRecord detail : result.matchDetails) {
  11. OrderEntity maker = detail.makerOrder();
  12. if (maker.status.isFinalStatus) {
  13. closedOrders.add(maker);
  14. }
  15. }
  16. this.orderQueue.add(closedOrders);
  17. }
  18. }
  19. // 启动一个线程将orderQueue的Order异步写入数据库:
  20. void saveOrders() {
  21. // TODO:
  22. }

类似的,输出OrderBook、通知用户成交等信息都是异步处理。

接下来,我们再继续完善processEvent(),处理单个事件时,在处理具体的业务逻辑之前,我们首先根据sequenceId判断是否是重复消息,是重复消息就丢弃:

  1. void processEvent(AbstractEvent event) {
  2. if (event.sequenceId <= this.lastSequenceId) {
  3. logger.warn("skip duplicate event: {}", event);
  4. return;
  5. }
  6. // TODO:
  7. }

紧接着,我们判断是否丢失了消息,如果丢失了消息,就根据上次处理的消息的sequenceId,从数据库里捞出后续消息,直到赶上当前消息的sequenceId为止:

  1. // 判断是否丢失了消息:
  2. if (event.previousId > this.lastSequenceId) {
  3. // 从数据库读取丢失的消息:
  4. List<AbstractEvent> events = storeService.loadEventsFromDb(this.lastSequenceId);
  5. if (events.isEmpty()) {
  6. // 读取失败:
  7. System.exit(1);
  8. return;
  9. }
  10. // 处理丢失的消息:
  11. for (AbstractEvent e : events) {
  12. this.processEvent(e);
  13. }
  14. return;
  15. }
  16. // 判断当前消息是否指向上一条消息:
  17. if (event.previousId != lastSequenceId) {
  18. System.exit(1);
  19. return;
  20. }
  21. // 正常处理:
  22. ...
  23. // 更新lastSequenceId:
  24. this.lastSequenceId = event.sequenceId;

这样一来,我们对消息系统的依赖就不是要求它100%可靠,遇到重复消息、丢失消息,交易引擎都可以从这些错误中自己恢复。

由于资产、订单、撮合、清算都在内存中完成,如何保证交易引擎每处理一个事件,它的内部状态都是正确的呢?我们可以为交易引擎增加一个自验证功能,在debug模式下,每处理一个事件,就自动验证内部状态的完整性,包括:

  • 验证资产系统总额为0,且除负债账户外其余账户资产不为负;
  • 验证订单系统未成交订单所冻结的资产与资产系统中的冻结一致;
  • 验证订单系统的订单与撮合引擎的订单簿一对一存在。
  1. void processEvent(AbstractEvent event) {
  2. ...
  3. if (debugMode) {
  4. this.validate();
  5. }
  6. }

这样我们就能快速在开发阶段尽可能早地发现问题。

交易引擎的测试也相对比较简单。对于同一组输入,每次运行都会得到相同的结果,所以我们可以构造几组确定的输入来验证交易引擎:

  1. class TradingEngineServiceTest {
  2. @Test
  3. public void testTradingEngine() {
  4. // TODO:
  5. }
  6. }

下面是问题解答。

交易引擎崩溃后如何恢复?

交易引擎如果运行时崩溃,可以重启,重启后先把现有的所有交易事件重头开始执行一遍,即可得到最新的状态。

注意到重头开始执行交易事件,会导致重复发出市场成交、用户订单通知等事件,因此,可根据时间做判断,不再重复发通知。下游系统在处理通知事件时,也要根据通知携带的sequenceId做去重判断。

有的童鞋会问,如果现有的交易事件已经有几千万甚至几十亿,从头开始执行如果需要花费几个小时甚至几天,怎么办?

可以定期把交易引擎的状态序列化至文件系统,例如,每10分钟一次。当交易引擎崩溃时,读取最新的状态文件,即可恢复至约10分钟前的状态,后续追赶只需要执行很少的事件消息。

如何序列化交易引擎的状态?

交易引擎的状态包括:

  • 资产系统的状态:即所有用户的资产列表;
  • 订单系统的状态:即所有活动订单列表;
  • 撮合引擎的状态:即买卖盘和最新市场价;
  • 最后一次处理的sequenceId。

序列化时,分别针对每个子系统进行序列化。对资产系统来说,每个用户的资产可序列化为用户ID: [USD可用, USD冻结, BTC可用, BTC冻结]的JSON格式,整个资产系统序列化后结构如下:

  1. {
  2. "1": [-123000, 0, -12.3, 0],
  3. "100": [60000, 20000, 9, 0],
  4. "200": [43000, 0, 3, 0.3]
  5. }

订单系统可序列化为一系列活动订单列表:

  1. [
  2. { "id": 10012207, "sequenceId": 1001, "price": 20901, ...},
  3. { "id": 10022207, "sequenceId": 1002, "price": 20902, ...},
  4. ]

撮合引擎可序列化为买卖盘列表(仅包含订单ID):

  1. {
  2. "BUY": [10012207, 10022207, ...],
  3. "SELL": [...],
  4. "marketPrice": 20901
  5. }

最后合并为一个交易引擎的状态文件:

  1. {
  2. "sequenceId": 189000,
  3. "assets": { ... },
  4. "orders": [ ... ],
  5. "match": { ... }
  6. }

交易引擎启动时,读取状态文件,然后依次恢复资产系统、订单系统和撮合引擎的状态,就得到了指定sequenceId的状态。

写入状态时,如果是异步写入,需要先复制状态、再写入,防止多线程读同一实例导致状态不一致。读写JSON时,要使用JSON库的流式API(例如Jackson的Streaming API),以免内存溢出。对BigDecimal进行序列化时,要注意不要误读为double类型以免丢失精度。

参考源码

可以从GitHubGitee下载源码。

GitHubmichaelliaowarpexchange/

▸ build)

▸ sql)

▤ schema.sql)

▤ docker-compose.yml)

▤ pom.xml)

▸ common)

▸ src/main)

▸ java/com/itranswarp/exchange)

▸ bean)

▤ OrderBookBean.java)

▤ OrderBookItemBean.java)

▸ db)

▤ AccessibleProperty.java)

▤ Criteria.java)

▤ CriteriaQuery.java)

▤ DbTemplate.java)

▤ From.java)

▤ Limit.java)

▤ Mapper.java)

▤ OrderBy.java)

▤ Select.java)

▤ Where.java)

▸ enums)

▤ AssetEnum.java)

▤ Direction.java)

▤ MatchType.java)

▤ OrderStatus.java)

▤ UserType.java)

▸ message)

▸ event)

▤ AbstractEvent.java)

▤ OrderCancelEvent.java)

▤ OrderRequestEvent.java)

▤ TransferEvent.java)

▤ AbstractMessage.java)

▤ ApiResultMessage.java)

▤ NotificationMessage.java)

▤ TickMessage.java)

▸ messaging)

▤ BatchMessageHandler.java)

▤ MessageConsumer.java)

▤ MessageProducer.java)

▤ MessageTypes.java)

▤ Messaging.java)

▤ MessagingConfiguration.java)

▤ MessagingFactory.java)

▸ model)

▸ quotation)

▤ TickEntity.java)

▸ support)

▤ EntitySupport.java)

▸ trade)

▤ EventEntity.java)

▤ MatchDetailEntity.java)

▤ OrderEntity.java)

▸ redis)

▤ RedisCache.java)

▤ RedisConfiguration.java)

▤ RedisService.java)

▤ SyncCommandCallback.java)

▸ support)

▤ LoggerSupport.java)

▸ util)

▤ ByteUtil.java)

▤ ClassPathUtil.java)

▤ IpUtil.java)

▤ JsonUtil.java)

▤ ApiError.java)

▤ ApiErrorResponse.java)

▤ ApiException.java)

▸ resources)

▸ redis)

▤ update-orderbook.lua)

▤ logback-spring.xml)

▤ pom.xml)

▸ config)

▸ src/main)

▸ java/com/itranswarp/exchange/config)

▤ ConfigApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ config-repo)

▤ application-default.yml)

▤ application-test.yml)

▤ application.yml)

▤ push.yml)

▤ quotation.yml)

▤ trading-api.yml)

▤ trading-engine.yml)

▤ trading-sequencer.yml)

▤ ui-default.yml)

▤ ui.yml)

▸ parent)

▤ pom.xml)

▸ push)

▸ src/main)

▸ java/com/itranswarp/exchange/push)

▤ PushApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ quotation)

▸ src/main)

▸ java/com/itranswarp/exchange)

▤ QuotationApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ trading-api)

▸ src/main)

▸ java/com/itranswarp/exchange)

▤ TradingApiApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ trading-engine)

▸ src)

▸ main)

▸ java/com/itranswarp/exchange)

▸ assets)

▤ Asset.java)

▤ AssetService.java)

▤ Transfer.java)

▸ clearing)

▤ ClearingService.java)

▸ match)

▤ MatchDetailRecord.java)

▤ MatchEngine.java)

▤ MatchResult.java)

▤ OrderBook.java)

▤ OrderKey.java)

▸ order)

▤ OrderService.java)

▸ store)

▤ StoreService.java)

▸ web/api)

▤ InternalTradingEngineApiController.java)

▤ TradingEngineApplication.java)

▤ TradingEngineService.java)

▸ resources)

▤ application.yml)

▸ test/java/com/itranswarp/exchange)

▸ assets)

▤ AssetServiceTest.java)

▸ match)

▤ MatchEngineTest.java)

▤ TradingEngineServiceTest.java)

▤ pom.xml)

▸ trading-sequencer)

▸ src/main)

▸ java/com/itranswarp/exchange)

▤ TradingSequencerApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ ui)

▸ src/main)

▸ java/com/itranswarp/exchange)

▤ UIApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▤ .gitignore)

▤ LICENSE)

▤ README.md)

小结

交易引擎是以事件驱动的状态机模型,同样的输入将得到同样的输出。为提高交易系统的健壮性,可以自动检测重复消息和消息丢失并自动恢复。

读后有收获可以支付宝请作者喝咖啡:

完成交易引擎 - 图1