我不是权威,文章内容仅为个人的经验,请大家持批判的态度阅读。如有不当恳请斧正。
很多人对使用python设计低延时系统的做法嗤之以鼻,我却不以为意。
我追求的并不是低延时,几十毫秒的延时根本不会产生什么本质影响。我只是好奇时间都去哪了?有没有办法让时间慢一些?所以,我寻求的是一个答案,关于python系统的延时极限的答案。相信会有和我一样好奇的朋友。虽然可能没有多大的意义,但是很有意思。
AlgoPlus就是在我好奇心驱使下交出的答卷,或许可以作为这个问题的一个小小的注解。
Cython封装C
- .pxd是与.h类似的头文件声明(目录\AlgoPlus\src\CTP\cython2c中的文件);
- .pyx是与.c类似的关于声明的具体实现(目录\AlgoPlus\src\AlgoPlus\CTP中的MdApi.pyx与TraderApi.pyx);
- cimport将.pxd的声明导入.pyx中;
- cdef用来声明类、方法、变量;
- 语句 with nogil: 声明了一个不需要GIL就能执行的代码块。 在这个块中,不能有任何的普通Python对象——只能使用被声明为cdef的对象和被显示声明为可以不持有GIL执行的函数。
- 完全兼容python语法;
- 编译后得到.pyd文件,可以被作为库导入到python程序中。
以TraderApi的确认结算单为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# ReqOrderInsert(CThostFtdcInputOrderField *pInputOrder, int nRequestID) # ///报单录入请求 # 《CTP量化投资API手册(3)TraderApi基础交易》 | http://7jia.com/70003.html def ReqOrderInsert(self, pInputOrder): cdef int result = -1 cdef int nRequestID cdef size_t address = 0 try: nRequestID = self.Inc_RequestID() if self._api is not NULL: address = addressof(pInputOrder) with nogil: result = self._api.ReqOrderInsert(<CThostFtdcInputOrderField *> address, nRequestID) except Exception as err_msg: self._write_log("ReqOrderInsert", err_msg) finally: return result |
如果有必要,也可以使用Cython将对延时极度敏感的策略代码转移到C语言扩展模块中编译后运行。
ctypes
ctypes支持python类型与c类型的无缝衔接,在调用C时会自动释放GIL,能以极低的代价通过c扩展提升程序性能。
AlgoPlus使用ctypes.Structure封装CTP官方支持的所有结构体变量。以录入报单为例,在策略(交易进程)中通过传递报单结构体参数调用父类TraderApi的ReqOrderInsert方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# 报单 def req_order_insert(self, exchange_id, instrument_id, order_price, order_vol, order_ref, direction, offset_flag): input_order_field = InputOrderField( BrokerID=self.broker_id, InvestorID=self.investor_id, ExchangeID=exchange_id, InstrumentID=instrument_id, UserID=self.investor_id, OrderPriceType="2", Direction=direction, CombOffsetFlag=offset_flag, CombHedgeFlag="1", LimitPrice=order_price, VolumeTotalOriginal=order_vol, TimeCondition="3", VolumeCondition="1", MinVolume=1, ContingentCondition="1", StopPrice=0, ForceCloseReason="0", IsAutoSuspend=0, OrderRef=str(order_ref), ) l_retVal = self.ReqOrderInsert(input_order_field) # self._write_log(f"req_order_insert=>InputOrderField=>address:{addressof(input_order_field)}") # 内存地址 |
父类TraderApi的中,只需对参数地址进行转换为c类型指针,就可以调用CTP官方API的ReqOrderInsert方法实现录入报单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# ReqOrderInsert(CThostFtdcInputOrderField *pInputOrder, int nRequestID) # ///报单录入请求 # 《CTP量化投资API手册(3)TraderApi基础交易》 | http://7jia.com/70003.html def ReqOrderInsert(self, pInputOrder): cdef int result = -1 cdef int nRequestID cdef size_t address = 0 try: nRequestID = self.Inc_RequestID() if self._api is not NULL: address = addressof(pInputOrder) with nogil: result = self._api.ReqOrderInsert(<CThostFtdcInputOrderField *> address, nRequestID) # self._write_log(f"ReqOrderInsert=>InputOrderField=>address:{address}") # 内存地址 except Exception as err_msg: self._write_log("ReqOrderInsert", err_msg) finally: return result |
此过程不涉及内存拷贝,参考以上代码将相应变量地址打印出来,即可验证。
ctype.char只能与bytes转换,所以使用字符串变量时,尽量直接使用bytes,而不是str。举个简单的例子,如果需要创建以InstrumentID为键值的字典,如果使用bytes,结构体的InstrumentID字段值就能直接进行索引,否则需要经过一次转换。
对于一些比较常用的结构体变量,可以在程序初始化时创建,使用时直接进行赋值,可节省临时创建申请空间的时间。CTP官方就是这样做的,有兴趣可以输出回调变量地址看一下。
当需要进一步提升性能时,例如对策略编译为pyx,或者使用内存映射实现进程通信、保存数据,ctypes定义的数据类型无需调整即可兼容这些c扩展技术。
另外,ctypes定义的变量也适用于bs架构的网络通信。
CTP的特性
使用CTP后台的账户可以有多个在多个登录点同时在线,simnow模拟账户最多支持2个登陆点,实盘账户支持的更多(有些期货公司可以支持8个)。
TraderApi具备完善的回调机制,所有在线的登录点都能同时接收到回报、通知等数据。所以,低延时策略应该充分利用回调信息维护本地数据,例如实时持仓数量、实时盈亏等,避免主动查询。不建议主动查询的另一个原因是CTP每秒只响应一次主动查询。
如果单账户策略数量少于最大登录点,可以为每个策略开启一个独立进程,使用OrderRef字段作为标识,过滤不相关的数据,由CTP接口回调事件驱动策略运行。单账户策略比较多的场景,可以分配N1个延时敏感进程,N2个次延时敏感进程,N3个延时不敏感进程,不同的策略通过数据共享机制与其相对应的进程通信。总之,TraderApi已经提供了多线程/进程间数据共享的机制,无论任何时候都应该是我们的首选,只有当CTP的共享线程数量不足时,我们才考虑其他共享技术进行配置,例如Queue、Redis、MMAP等。
MdApi只能接收到线程/进程内订阅的合约的行情通知,多合约场景,通过创建不同延时级别的线程/进程收发行情可以有效避免多合约的延时。
AlgoPlus就是充分利用TraderApi和MdApi这些特性来实现多账户、多合约、多策略的(低延时)应用设计。
MdApi和TraderApi之间需要外部数据共享方案,AlgoPlus推荐的是Queue,所以大家会在很多例子中看到以下启动策略的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
share_queue = Queue(maxsize=100) # 共享队列 if __name__ == "__main__": import sys sys.path.append("..") from account_info import my_future_account_info_dict future_account = my_future_account_info_dict['SimNow24'] # 行情进程 md_process = Process(target=TickEngine, args=(future_account.server_dict['MDServer'] , future_account.broker_id , future_account.investor_id , future_account.password , future_account.app_id , future_account.auth_code , future_account.instrument_id_list , [share_queue] , future_account.md_page_dir) ) # 交易进程 trader_process = Process(target=TraderEngine, args=(future_account.server_dict['TDServer'] , future_account.broker_id , future_account.investor_id , future_account.password , future_account.app_id , future_account.auth_code , share_queue , future_account.td_page_dir) ) md_process.start() trader_process.start() md_process.join() trader_process.join() |
创建一/多个交易进程和行情进程,行情进程将收到的行情通知放到队列里,交易进程从队列里把和自己相关的行情取出来,从而驱动交易逻辑的判断,最终完成交易。
之所有应用多进程而不是多线程,是因为python的进程使用的解释器是相互独立的,所以不会受限于GIL,可以充分利用多核CPU的优势。
AlgoPlus的定位及规划
AlgoPlus项目准备做三件事情:
首先,将C++翻译成高性能的python代码,“信达雅”是评价这项工作的唯一标准。除非CTP官方进行较大升级,否则,AlgoPlus发布的python版API比较稳定。未来如果有机会,我们也希望能够开展证券方面的业务。
其次,通过对AlgoPlus的扩展构建量化交易解决方案。例如日内交易策略模板、趋势交易策略模板、套利策略模板、bs架构设计、ui设计、数据库应用等待。
最后,研究。前期主要精力放在偏向技术的研究,后期将转型向交易相关研究发展。
欢迎大家参与到自己感兴趣的环节中,共同交流。
参考资料
GIL
尽管Python完全支持多线程编程,但是解释器的C语言实现部分在完全并行执行时并不是线程安全的。实际上,解释器被一个全局解释器锁保护着,它确保任何时候都只有一个Python线程执行。GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势(比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。
AlgoPlus使用Cython封装CTP官方API,不仅释放了GIL,而且将代码转移到C语言扩展模块中,大大提升了系统性能。
Cython
ctypes — Python 的外部函数库
https://docs.python.org/zh-cn/3/library/ctypes.html
AlgoPlus项目地址:
不要吝啬您的star。来实现我的小目标:项目star数量达到100,开始制作视频教程。
码云:https://gitee.com/AlgoPlus/AlgoPlus
Github:https://github.com/CTPPlus/AlgoPlus
评论前必须登录!
注册