我在部署实盘时,用一个期货公司账户,交易螺纹和热卷两个品种的5分钟线,而每个品种下又要跑6个不同setting的策略。 比如螺纹的6个setting,使用的都是同一个数据(螺纹主力,5分钟k线),此时我有几个选择
先说(a),且不说6个机器人的费用,交易所会限制查询和交易的间隔,这么做可能会导致部分机器人数据更新的不及时;而且中间存在6次重复的查询,没必要。 再看(b),for循环中计算指标和开仓要依次进行,这样6个setting有的就要慢一拍;而且你如果只用主程序串行执行代码,那开仓可以5分钟一次,止损难道也是5分钟一次吗,显然不合理。 而(c)这个虽然分发过程中也会存在耗时,但是相比前两个应该会少很多。
关于分发的方法,我考虑了几种可能,并尝试了下:
【方法】:内存共享,数据收集进程每次修改共享内存的数据,其他6个策略进程则依次访问这块内存; 【尝试结果】:实现过程中,作为程序员觉得太不优雅,数据收集进程对于这块内存有读写权限,其他6个策略进程明明只需要写入权限;可是python里面这块内存不论读写都要lock住;而且没有阻塞功能,需要自己判断数据是否更新了;写着写着,越来越无法承受写出这种不优雅代码的负罪感;作罢
【方法】:网络通信,数据收集进程创建一个http协议,然后其他6个策略进程不断访问这个http接口,并且判断是否数据更新了; 【尝试结果】:这个代码实现倒是非常简单,但是显而易见的——都在本机了还开本地网络接口传输干什么,想把自己慢死吗?
【方法】:多进程的数据通信 【尝试结果】:没接受到数据会自动阻塞,分发速度较快,不存在多进程访问同一份内存数据的隐患;尝试了几种多进程通信的方法(Pipe,Queue,Manager),测试了下速度,最后选择了最快的Pipe。参考这篇知乎文章
这里贴一下多线程分发的最小实现代码,在本机和服务器的测速中,发送下面的这个330*10的一个pandas的dataframe,一个数据的发送到接受用了1-2ms。由于是依次发送,发完全部16个最大延迟5ms;对于我5分钟的策略完全够了,毕竟ticker也得500ms一个。高频就算了,高频用什么python
import datetime
import threading
import numpy as np
import pandas as pd
from multiprocessing import Process, Pipe
# 数据收集进程,把自己写的数据查询收集的代码该进去;一旦数据ready就分发出去
def data_produce(pipe_list, pid):
data = np.ones((330, 10))
data = pd.DataFrame(data)
cnt = 0
while 1:
cnt += 1
if cnt>10: exit(0)
time.sleep(0.5)
for p in pipe_list:
p.send({'data':data, 'now':datetime.datetime.now().time()})
# 策略代码,接收新的数据;没接收到时会阻塞在pipe.recv()这里,收到后执行策略代码,开仓闭仓等
def data_analyse(pipe, pid):
while 1:
rec = pipe.recv()
now = datetime.datetime.now().time()
# 看下第一个和最后一个进程的发送接收延迟
if pid in ['analyse0', 'analyse15']:
print(rec['now'], now, "received", rec['data'].shape, pid)
if __name__ == "__main__":
num = 16
sends = []
recs = []
processes = []
# 这里是16个策略,共用同一个品种数据;所以创建16个数据分发的Pipe,每个Pipe有一个发送器和一个接收器
for i in range(num):
receive, send = Pipe()
sends.append(send)
recs.append(receive)
# 只有一个数据收集进程,把16个数据分发Pipe的发送器交给这个进程
processes.append(Process(target=data_produce, args=(sends, 'poducer')))
# 我们有16个策略进程,把16个数据分发Pipe的接收器依次交给每一个策略进程
for i in range(num):
processes.append(Process(target=data_analyse, args=(recs[i], 'analyse{}'.format(i))))
# 跑吧
for p in processes:
p.start()
for p in processes:
p.join()
1、我的实际代码中,是每个ticker都会分发一次,毕竟判断止损要时时刻刻判断;这个看需求随意设置了 2、如果不想出现“一核有难,七核围观”的盛况的话;python最好用多进程multiprocessing,而不是多线程threading;python的threading只是多个线程轮流调用一个进程的cpu资源而已。