实盘运行

在实盘环境,用户可以基于编写好的CTA策略来创建新的实例,一键初始化、启动、停止策略。

创建策略实例

用户可以基于编写好的CTA策略来创建新的实例,策略实例的好处在于同一个策略可以同时去运行多个品种合约,并且每个实例的参数可以是不同的。在创建实例的时候需要填写如图的实例名称、合约品种、参数设置。注意:实例名称不能重名;合约名称是vt_symbol的格式,如IF1905.CFFEX。

https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/cta_strategy/add_strategy.png

创建策略流程如下:

  • 检查策略实例重名

  • 添加策略配置信息(strategy_name, vt_symbol, setting)到strategies字典上

  • 添加该策略要订阅行情的合约信息到symbol_strategy_map字典中;

  • 把策略配置信息保存到json文件内;

  • 在图形化界面更新状态信息。

  1. def add_strategy(
  2. self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
  3. ):
  4. """
  5. Add a new strategy.
  6. """
  7. if strategy_name in self.strategies:
  8. self.write_log(f"创建策略失败,存在重名{strategy_name}")
  9. return
  10.  
  11. strategy_class = self.classes[class_name]
  12.  
  13. strategy = strategy_class(self, strategy_name, vt_symbol, setting)
  14. self.strategies[strategy_name] = strategy
  15.  
  16. # Add vt_symbol to strategy map.
  17. strategies = self.symbol_strategy_map[vt_symbol]
  18. strategies.append(strategy)
  19.  
  20. # Update to setting file.
  21. self.update_strategy_setting(strategy_name, setting)
  22.  
  23. self.put_strategy_event(strategy)

初始化策略

  • 调用策略类的on_init()回调函数,并且载入历史数据;

  • 恢复上次退出之前的策略状态;

  • 从.vntrader/cta_strategy_data.json内读取策略参数,最新的技术指标,以及持仓数量;

  • 调用接口的subcribe()函数订阅指定行情信息;

  • 策略初始化状态变成True,并且更新到日志上。

  1. def _init_strategy(self):
  2. """
  3. Init strategies in queue.
  4. """
  5. while not self.init_queue.empty():
  6. strategy_name = self.init_queue.get()
  7. strategy = self.strategies[strategy_name]
  8.  
  9. if strategy.inited:
  10. self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
  11. continue
  12.  
  13. self.write_log(f"{strategy_name}开始执行初始化")
  14.  
  15. # Call on_init function of strategy
  16. self.call_strategy_func(strategy, strategy.on_init)
  17.  
  18. # Restore strategy data(variables)
  19. data = self.strategy_data.get(strategy_name, None)
  20. if data:
  21. for name in strategy.variables:
  22. value = data.get(name, None)
  23. if value:
  24. setattr(strategy, name, value)
  25.  
  26. # Subscribe market data
  27. contract = self.main_engine.get_contract(strategy.vt_symbol)
  28. if contract:
  29. req = SubscribeRequest(
  30. symbol=contract.symbol, exchange=contract.exchange)
  31. self.main_engine.subscribe(req, contract.gateway_name)
  32. else:
  33. self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)
  34.  
  35. # Put event to update init completed status.
  36. strategy.inited = True
  37. self.put_strategy_event(strategy)
  38. self.write_log(f"{strategy_name}初始化完成")
  39.  
  40. self.init_thread = None

启动策略

  • 检查策略初始化状态;

  • 检查策略启动状态,避免重复启动;

  • 调用策略类的on_start()函数启动策略;

  • 策略启动状态变成True,并且更新到图形化界面上。

  1. def start_strategy(self, strategy_name: str):
  2. """
  3. Start a strategy.
  4. """
  5. strategy = self.strategies[strategy_name]
  6. if not strategy.inited:
  7. self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
  8. return
  9.  
  10. if strategy.trading:
  11. self.write_log(f"{strategy_name}已经启动,请勿重复操作")
  12. return
  13.  
  14. self.call_strategy_func(strategy, strategy.on_start)
  15. strategy.trading = True
  16.  
  17. self.put_strategy_event(strategy)

停止策略

  • 检查策略启动状态;

  • 调用策略类的on_stop()函数停止策略;

  • 更新策略启动状态为False;

  • 对所有为成交的委托(市价单/限价单/本地停止单)进行撤单操作;

  • 把策略参数,最新的技术指标,以及持仓数量保存到.vntrader/cta_strategy_data.json内;

  • 在图形化界面更新策略状态。

  1. def stop_strategy(self, strategy_name: str):
  2. """
  3. Stop a strategy.
  4. """
  5. strategy = self.strategies[strategy_name]
  6. if not strategy.trading:
  7. return
  8.  
  9. # Call on_stop function of the strategy
  10. self.call_strategy_func(strategy, strategy.on_stop)
  11.  
  12. # Change trading status of strategy to False
  13. strategy.trading = False
  14.  
  15. # Cancel all orders of the strategy
  16. self.cancel_all(strategy)
  17.  
  18. # Sync strategy variables to data file
  19. self.sync_strategy_data(strategy)
  20.  
  21. # Update GUI
  22. self.put_strategy_event(strategy)

编辑策略

  • 重新配置策略参数字典setting;

  • 更新参数字典到策略中;

  • 在图像化界面更新策略状态。

  1. def edit_strategy(self, strategy_name: str, setting: dict):
  2. """
  3. Edit parameters of a strategy.
  4. """
  5. strategy = self.strategies[strategy_name]
  6. strategy.update_setting(setting)
  7.  
  8. self.update_strategy_setting(strategy_name, setting)
  9. self.put_strategy_event(strategy)

移除策略

  • 检查策略状态,只有停止策略后从可以移除策略;

  • 从json文件移除策略配置信息(strategy_name, vt_symbol, setting);

  • 从symbol_strategy_map字典中移除该策略订阅的合约信息;

  • 从strategy_orderid_map字典移除活动委托记录;

  • 从strategies字典移除该策略的相关配置信息。

  1. def remove_strategy(self, strategy_name: str):
  2. """
  3. Remove a strategy.
  4. """
  5. strategy = self.strategies[strategy_name]
  6. if strategy.trading:
  7. self.write_log(f"策略{strategy.strategy_name}移除失败,请先停止")
  8. return
  9.  
  10. # Remove setting
  11. self.remove_strategy_setting(strategy_name)
  12.  
  13. # Remove from symbol strategy map
  14. strategies = self.symbol_strategy_map[strategy.vt_symbol]
  15. strategies.remove(strategy)
  16.  
  17. # Remove from active orderid map
  18. if strategy_name in self.strategy_orderid_map:
  19. vt_orderids = self.strategy_orderid_map.pop(strategy_name)
  20.  
  21. # Remove vt_orderid strategy map
  22. for vt_orderid in vt_orderids:
  23. if vt_orderid in self.orderid_strategy_map:
  24. self.orderid_strategy_map.pop(vt_orderid)
  25.  
  26. # Remove from strategies
  27. self.strategies.pop(strategy_name)
  28.  
  29. return True

锁仓操作

用户在编写策略时,可以通过填写lock字段来让策略完成锁仓操作,即禁止平今,通过反向开仓来代替。

  • 在cta策略模板template中,可以看到如下具体委托函数都有lock字段,并且默认为False。
  1. def buy(self, price: float, volume: float, stop: bool = False, lock: bool = False):
  2. """
  3. Send buy order to open a long position.
  4. """
  5. return self.send_order(Direction.LONG, Offset.OPEN, price, volume, stop, lock)
  6.  
  7. def sell(self, price: float, volume: float, stop: bool = False, lock: bool = False):
  8. """
  9. Send sell order to close a long position.
  10. """
  11. return self.send_order(Direction.SHORT, Offset.CLOSE, price, volume, stop, lock)
  12.  
  13. def short(self, price: float, volume: float, stop: bool = False, lock: bool = False):
  14. """
  15. Send short order to open as short position.
  16. """
  17. return self.send_order(Direction.SHORT, Offset.OPEN, price, volume, stop, lock)
  18.  
  19. def cover(self, price: float, volume: float, stop: bool = False, lock: bool = False):
  20. """
  21. Send cover order to close a short position.
  22. """
  23. return self.send_order(Direction.LONG, Offset.CLOSE, price, volume, stop, lock)
  24.  
  25. def send_order(
  26. self,
  27. direction: Direction,
  28. offset: Offset,
  29. price: float,
  30. volume: float,
  31. stop: bool = False,
  32. lock: bool = False
  33. ):
  34. """
  35. Send a new order.
  36. """
  37. if self.trading:
  38. vt_orderids = self.cta_engine.send_order(
  39. self, direction, offset, price, volume, stop, lock
  40. )
  41. return vt_orderids
  42. else:
  43. return []
  • 设置lock=True后,cta实盘引擎send_order()函数发生响应,并且调用其最根本的委托函数send_server_order()去处理锁仓委托转换。首先是创建原始委托original_req,然后调用converter文件里面OffsetConverter类的convert_order_request来进行相关转换。
  1. def send_order(
  2. self,
  3. strategy: CtaTemplate,
  4. direction: Direction,
  5. offset: Offset,
  6. price: float,
  7. volume: float,
  8. stop: bool,
  9. lock: bool
  10. ):
  11. """
  12. """
  13. contract = self.main_engine.get_contract(strategy.vt_symbol)
  14. if not contract:
  15. self.write_log(f"委托失败,找不到合约:{strategy.vt_symbol}", strategy)
  16. return ""
  17.  
  18. if stop:
  19. if contract.stop_supported:
  20. return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock)
  21. else:
  22. return self.send_local_stop_order(strategy, direction, offset, price, volume, lock)
  23. else:
  24. return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock)
  25.  
  26. def send_limit_order(
  27. self,
  28. strategy: CtaTemplate,
  29. contract: ContractData,
  30. direction: Direction,
  31. offset: Offset,
  32. price: float,
  33. volume: float,
  34. lock: bool
  35. ):
  36. """
  37. Send a limit order to server.
  38. """
  39. return self.send_server_order(
  40. strategy,
  41. contract,
  42. direction,
  43. offset,
  44. price,
  45. volume,
  46. OrderType.LIMIT,
  47. lock
  48. )
  49.  
  50. def send_server_order(
  51. self,
  52. strategy: CtaTemplate,
  53. contract: ContractData,
  54. direction: Direction,
  55. offset: Offset,
  56. price: float,
  57. volume: float,
  58. type: OrderType,
  59. lock: bool
  60. ):
  61. """
  62. Send a new order to server.
  63. """
  64. # Create request and send order.
  65. original_req = OrderRequest(
  66. symbol=contract.symbol,
  67. exchange=contract.exchange,
  68. direction=direction,
  69. offset=offset,
  70. type=type,
  71. price=price,
  72. volume=volume,
  73. )
  74.  
  75. # Convert with offset converter
  76. req_list = self.offset_converter.convert_order_request(original_req, lock)
  77.  
  78. # Send Orders
  79. vt_orderids = []
  80.  
  81. for req in req_list:
  82. vt_orderid = self.main_engine.send_order(
  83. req, contract.gateway_name)
  84. vt_orderids.append(vt_orderid)
  85.  
  86. self.offset_converter.update_order_request(req, vt_orderid)
  87.  
  88. # Save relationship between orderid and strategy.
  89. self.orderid_strategy_map[vt_orderid] = strategy
  90. self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid)
  91.  
  92. return vt_orderids
  • 在convert_order_request_lock()函数中,先计算今仓的量和昨可用量;然后进行判断:若有今仓,只能开仓(锁仓);无今仓时候,若平仓量小于等于昨可用,全部平昨,反之,先平昨,剩下的反向开仓。
  1. def convert_order_request_lock(self, req: OrderRequest):
  2. """"""
  3. if req.direction == Direction.LONG:
  4. td_volume = self.short_td
  5. yd_available = self.short_yd - self.short_yd_frozen
  6. else:
  7. td_volume = self.long_td
  8. yd_available = self.long_yd - self.long_yd_frozen
  9.  
  10. # If there is td_volume, we can only lock position
  11. if td_volume:
  12. req_open = copy(req)
  13. req_open.offset = Offset.OPEN
  14. return [req_open]
  15. # If no td_volume, we close opposite yd position first
  16. # then open new position
  17. else:
  18. open_volume = max(0, req.volume - yd_available)
  19. req_list = []
  20.  
  21. if yd_available:
  22. req_yd = copy(req)
  23. if self.exchange == Exchange.SHFE:
  24. req_yd.offset = Offset.CLOSEYESTERDAY
  25. else:
  26. req_yd.offset = Offset.CLOSE
  27. req_list.append(req_yd)
  28.  
  29. if open_volume:
  30. req_open = copy(req)
  31. req_open.offset = Offset.OPEN
  32. req_open.volume = open_volume
  33. req_list.append(req_open)
  34.  
  35. return req_list