实盘运行
在实盘环境,用户可以基于编写好的CTA策略来创建新的实例,一键初始化、启动、停止策略。
创建策略实例
用户可以基于编写好的CTA策略来创建新的实例,策略实例的好处在于同一个策略可以同时去运行多个品种合约,并且每个实例的参数可以是不同的。在创建实例的时候需要填写如图的实例名称、合约品种、参数设置。注意:实例名称不能重名;合约名称是vt_symbol的格式,如IF1905.CFFEX。
创建策略流程如下:
检查策略实例重名
添加策略配置信息(strategy_name, vt_symbol, setting)到strategies字典上
添加该策略要订阅行情的合约信息到symbol_strategy_map字典中;
把策略配置信息保存到json文件内;
在图形化界面更新状态信息。
- def add_strategy(
- self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
- ):
- """
- Add a new strategy.
- """
- if strategy_name in self.strategies:
- self.write_log(f"创建策略失败,存在重名{strategy_name}")
- return
- strategy_class = self.classes[class_name]
- strategy = strategy_class(self, strategy_name, vt_symbol, setting)
- self.strategies[strategy_name] = strategy
- # Add vt_symbol to strategy map.
- strategies = self.symbol_strategy_map[vt_symbol]
- strategies.append(strategy)
- # Update to setting file.
- self.update_strategy_setting(strategy_name, setting)
- self.put_strategy_event(strategy)
初始化策略
调用策略类的on_init()回调函数,并且载入历史数据;
恢复上次退出之前的策略状态;
从.vntrader/cta_strategy_data.json内读取策略参数,最新的技术指标,以及持仓数量;
调用接口的subcribe()函数订阅指定行情信息;
策略初始化状态变成True,并且更新到日志上。
- def _init_strategy(self):
- """
- Init strategies in queue.
- """
- while not self.init_queue.empty():
- strategy_name = self.init_queue.get()
- strategy = self.strategies[strategy_name]
- if strategy.inited:
- self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
- continue
- self.write_log(f"{strategy_name}开始执行初始化")
- # Call on_init function of strategy
- self.call_strategy_func(strategy, strategy.on_init)
- # Restore strategy data(variables)
- data = self.strategy_data.get(strategy_name, None)
- if data:
- for name in strategy.variables:
- value = data.get(name, None)
- if value:
- setattr(strategy, name, value)
- # Subscribe market data
- contract = self.main_engine.get_contract(strategy.vt_symbol)
- if contract:
- req = SubscribeRequest(
- symbol=contract.symbol, exchange=contract.exchange)
- self.main_engine.subscribe(req, contract.gateway_name)
- else:
- self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)
- # Put event to update init completed status.
- strategy.inited = True
- self.put_strategy_event(strategy)
- self.write_log(f"{strategy_name}初始化完成")
- self.init_thread = None
启动策略
检查策略初始化状态;
检查策略启动状态,避免重复启动;
调用策略类的on_start()函数启动策略;
策略启动状态变成True,并且更新到图形化界面上。
- def start_strategy(self, strategy_name: str):
- """
- Start a strategy.
- """
- strategy = self.strategies[strategy_name]
- if not strategy.inited:
- self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
- return
- if strategy.trading:
- self.write_log(f"{strategy_name}已经启动,请勿重复操作")
- return
- self.call_strategy_func(strategy, strategy.on_start)
- strategy.trading = True
- self.put_strategy_event(strategy)
停止策略
检查策略启动状态;
调用策略类的on_stop()函数停止策略;
更新策略启动状态为False;
对所有为成交的委托(市价单/限价单/本地停止单)进行撤单操作;
把策略参数,最新的技术指标,以及持仓数量保存到.vntrader/cta_strategy_data.json内;
在图形化界面更新策略状态。
- def stop_strategy(self, strategy_name: str):
- """
- Stop a strategy.
- """
- strategy = self.strategies[strategy_name]
- if not strategy.trading:
- return
- # Call on_stop function of the strategy
- self.call_strategy_func(strategy, strategy.on_stop)
- # Change trading status of strategy to False
- strategy.trading = False
- # Cancel all orders of the strategy
- self.cancel_all(strategy)
- # Sync strategy variables to data file
- self.sync_strategy_data(strategy)
- # Update GUI
- self.put_strategy_event(strategy)
编辑策略
重新配置策略参数字典setting;
更新参数字典到策略中;
在图像化界面更新策略状态。
- def edit_strategy(self, strategy_name: str, setting: dict):
- """
- Edit parameters of a strategy.
- """
- strategy = self.strategies[strategy_name]
- strategy.update_setting(setting)
- self.update_strategy_setting(strategy_name, setting)
- self.put_strategy_event(strategy)
移除策略
检查策略状态,只有停止策略后从可以移除策略;
从json文件移除策略配置信息(strategy_name, vt_symbol, setting);
从symbol_strategy_map字典中移除该策略订阅的合约信息;
从strategy_orderid_map字典移除活动委托记录;
从strategies字典移除该策略的相关配置信息。
- def remove_strategy(self, strategy_name: str):
- """
- Remove a strategy.
- """
- strategy = self.strategies[strategy_name]
- if strategy.trading:
- self.write_log(f"策略{strategy.strategy_name}移除失败,请先停止")
- return
- # Remove setting
- self.remove_strategy_setting(strategy_name)
- # Remove from symbol strategy map
- strategies = self.symbol_strategy_map[strategy.vt_symbol]
- strategies.remove(strategy)
- # Remove from active orderid map
- if strategy_name in self.strategy_orderid_map:
- vt_orderids = self.strategy_orderid_map.pop(strategy_name)
- # Remove vt_orderid strategy map
- for vt_orderid in vt_orderids:
- if vt_orderid in self.orderid_strategy_map:
- self.orderid_strategy_map.pop(vt_orderid)
- # Remove from strategies
- self.strategies.pop(strategy_name)
- return True
锁仓操作
用户在编写策略时,可以通过填写lock字段来让策略完成锁仓操作,即禁止平今,通过反向开仓来代替。
- 在cta策略模板template中,可以看到如下具体委托函数都有lock字段,并且默认为False。
- def buy(self, price: float, volume: float, stop: bool = False, lock: bool = False):
- """
- Send buy order to open a long position.
- """
- return self.send_order(Direction.LONG, Offset.OPEN, price, volume, stop, lock)
- def sell(self, price: float, volume: float, stop: bool = False, lock: bool = False):
- """
- Send sell order to close a long position.
- """
- return self.send_order(Direction.SHORT, Offset.CLOSE, price, volume, stop, lock)
- def short(self, price: float, volume: float, stop: bool = False, lock: bool = False):
- """
- Send short order to open as short position.
- """
- return self.send_order(Direction.SHORT, Offset.OPEN, price, volume, stop, lock)
- def cover(self, price: float, volume: float, stop: bool = False, lock: bool = False):
- """
- Send cover order to close a short position.
- """
- return self.send_order(Direction.LONG, Offset.CLOSE, price, volume, stop, lock)
- def send_order(
- self,
- direction: Direction,
- offset: Offset,
- price: float,
- volume: float,
- stop: bool = False,
- lock: bool = False
- ):
- """
- Send a new order.
- """
- if self.trading:
- vt_orderids = self.cta_engine.send_order(
- self, direction, offset, price, volume, stop, lock
- )
- return vt_orderids
- else:
- return []
- 设置lock=True后,cta实盘引擎send_order()函数发生响应,并且调用其最根本的委托函数send_server_order()去处理锁仓委托转换。首先是创建原始委托original_req,然后调用converter文件里面OffsetConverter类的convert_order_request来进行相关转换。
- def send_order(
- self,
- strategy: CtaTemplate,
- direction: Direction,
- offset: Offset,
- price: float,
- volume: float,
- stop: bool,
- lock: bool
- ):
- """
- """
- contract = self.main_engine.get_contract(strategy.vt_symbol)
- if not contract:
- self.write_log(f"委托失败,找不到合约:{strategy.vt_symbol}", strategy)
- return ""
- if stop:
- if contract.stop_supported:
- return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock)
- else:
- return self.send_local_stop_order(strategy, direction, offset, price, volume, lock)
- else:
- return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock)
- def send_limit_order(
- self,
- strategy: CtaTemplate,
- contract: ContractData,
- direction: Direction,
- offset: Offset,
- price: float,
- volume: float,
- lock: bool
- ):
- """
- Send a limit order to server.
- """
- return self.send_server_order(
- strategy,
- contract,
- direction,
- offset,
- price,
- volume,
- OrderType.LIMIT,
- lock
- )
- def send_server_order(
- self,
- strategy: CtaTemplate,
- contract: ContractData,
- direction: Direction,
- offset: Offset,
- price: float,
- volume: float,
- type: OrderType,
- lock: bool
- ):
- """
- Send a new order to server.
- """
- # Create request and send order.
- original_req = OrderRequest(
- symbol=contract.symbol,
- exchange=contract.exchange,
- direction=direction,
- offset=offset,
- type=type,
- price=price,
- volume=volume,
- )
- # Convert with offset converter
- req_list = self.offset_converter.convert_order_request(original_req, lock)
- # Send Orders
- vt_orderids = []
- for req in req_list:
- vt_orderid = self.main_engine.send_order(
- req, contract.gateway_name)
- vt_orderids.append(vt_orderid)
- self.offset_converter.update_order_request(req, vt_orderid)
- # Save relationship between orderid and strategy.
- self.orderid_strategy_map[vt_orderid] = strategy
- self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid)
- return vt_orderids
- 在convert_order_request_lock()函数中,先计算今仓的量和昨可用量;然后进行判断:若有今仓,只能开仓(锁仓);无今仓时候,若平仓量小于等于昨可用,全部平昨,反之,先平昨,剩下的反向开仓。
- def convert_order_request_lock(self, req: OrderRequest):
- """"""
- if req.direction == Direction.LONG:
- td_volume = self.short_td
- yd_available = self.short_yd - self.short_yd_frozen
- else:
- td_volume = self.long_td
- yd_available = self.long_yd - self.long_yd_frozen
- # If there is td_volume, we can only lock position
- if td_volume:
- req_open = copy(req)
- req_open.offset = Offset.OPEN
- return [req_open]
- # If no td_volume, we close opposite yd position first
- # then open new position
- else:
- open_volume = max(0, req.volume - yd_available)
- req_list = []
- if yd_available:
- req_yd = copy(req)
- if self.exchange == Exchange.SHFE:
- req_yd.offset = Offset.CLOSEYESTERDAY
- else:
- req_yd.offset = Offset.CLOSE
- req_list.append(req_yd)
- if open_volume:
- req_open = copy(req)
- req_open.offset = Offset.OPEN
- req_open.volume = open_volume
- req_list.append(req_open)
- return req_list