测试版 如有BUG 欢迎提出。
1、2017.4.25 更新:
增加if (insDetail.MaxLimitOrderVolume == 0)
条件判断,有些期货公司服务器会返回0值,特此处理。共修改3处
【1】self.pollTask
【2】function Cover
【3】function Open
2、2020.10.20 更新: 增加CTA函数支持,逻辑移植自JavaScript版本商品期货交易类库CTA函数。 调用例子: “` def callBack_CTA(st): if len(st[“records”]) < 20: return emaSlow = TA.EMA(st[“records”], 20) emaFast = TA.EMA(st[“records”], 5) cross = ext.Cross(emaFast, emaSlow)
if st[“position”][“amount”] <= 0 and cross > 2: Log(“金叉周期”, cross, “当前持仓:”, st[“position”]) return 2 if st[“position”][“amount”] < 0 else 1 elif st[“position”][“amount”] >= 0 and cross < -2: Log(“死叉周期”, cross, “当前持仓:”, st[“position”]) return -2 if st[“position”][“amount”] > 0 else -1
ret = ext.CTA(“MA000/MA888,rb000/rb888,i000/i888”, callBack_CTA)
- 3、2021.07.14 更新:
if pos[“Type”] == PD_LONG or pos[“Type”] == PD_LONG_YD:
笔误
PD_LONG_YD写成了
PD_SHORT_YD”`
import json # json 模块
import types # 类型 模块
import platform # 版本信息
import traceback # 用于异常处理
# str() : ASCII and UTF-8
import time
# import sys
# reload(sys)
# sys.setdefaultencoding('utf8')
versionMainValue = None # 记录python 版本信息
isFirstCheck = True # 记录 是否是第一次检查
def CheckVersion(): # 检查python 版本
global versionMainValue,isFirstCheck
if(isFirstCheck == True):
platformInfo = platform.python_version()
if platformInfo[0] == '2':
Log("您使用的托管者 python编译环境的python版本是",platformInfo)
versionMainValue = 2
import sys
reload(sys)
sys.setdefaultencoding('utf8')
Log("import sys, reload(sys), sys.setdefaultencoding('utf8')")
elif platformInfo[0] == '3':
Log("您使用的托管者 python编译环境的python版本是",platformInfo)
versionMainValue = 3
else:
Log("其它版本")
isFirstCheck = False
def typeOfstr(str):
if str == "list":
if versionMainValue == 2:
return types.ListType
elif versionMainValue == 3:
return list
elif str == "int":
if versionMainValue == 2:
return types.IntType
elif versionMainValue == 3:
return int
elif str == "float":
if versionMainValue == 2:
return types.FloatType
elif versionMainValue == 3:
return float
else:
Log("error , typeOfstr used false")
def init():
if not 'SlideTick' in locals().keys():
SlideTick = 1
else:
SlideTick = int(SlideTick)
CheckVersion() # 检查python 版本
if IsVirtual():
exchange.GetRawJSON = VGetRawJSON
Log("回测系统中运行,已重写GetRawJSON。")
Log("商品期货交易类库加载成功")
def GetPosition(e, contractType, direction, positions = None):
allCost = 0
allAmount = 0
allProfit = 0
allFrozen = 0
posMargin = 0
if (not positions in dir()) or (not positions):
positions = _C(e.GetPosition)
for i in range(len(positions)):
if (positions[i]['ContractType'] == contractType and (((positions[i]['Type'] == PD_LONG or positions[i]['Type'] == PD_LONG_YD) and direction == PD_LONG) or ((positions[i]['Type'] == PD_SHORT or positions[i]['Type'] == PD_SHORT_YD) and direction == PD_SHORT))):
posMargin = positions[i]['MarginLevel']
allCost += positions[i]['Price'] * positions[i]['Amount']
allAmount += positions[i]['Amount']
allProfit += positions[i]['Profit']
allFrozen += positions[i]['FrozenAmount']
if allAmount == 0:
return
return {
"MarginLevel": posMargin,
"FrozenAmount": allFrozen,
"Price": _N(allCost / allAmount),
"Amount": allAmount,
"Profit": allProfit,
"Type": direction,
"ContractType": contractType
}
def Open(e, contractType, direction, opAmount):
initPosition = GetPosition(e, contractType, direction)
isFirst = True
initAmount = initPosition['Amount'] if initPosition else 0
positionNow = initPosition
while True:
needOpen = opAmount
if isFirst:
isFirst = False
else:
positionNow = GetPosition(e, contractType, direction)
if positionNow:
needOpen = opAmount - (positionNow['Amount'] - initAmount)
insDetail = _C(e.SetContractType, contractType)
# if (insDetail.MaxLimitOrderVolume == 0)
if insDetail["MaxLimitOrderVolume"] == 0:
# insDetail.MaxLimitOrderVolume = 50
insDetail["MaxLimitOrderVolume"] = 50
if needOpen < insDetail['MinLimitOrderVolume']:
break
depth = _C(e.GetDepth)
amount = min(insDetail['MaxLimitOrderVolume'], needOpen)
e.SetDirection("buy" if direction == PD_LONG else "sell")
orderId = None
if direction == PD_LONG:
orderId = e.Buy(depth['Asks'][0]['Price'] + (insDetail['PriceTick'] * SlideTick), min(amount, depth['Asks'][0]['Amount']), contractType, 'Ask', depth['Asks'][0])
else:
orderId = e.Sell(depth['Bids'][0]['Price'] - (insDetail['PriceTick'] * SlideTick), min(amount, depth['Bids'][0]['Amount']), contractType, 'Bid', depth['Bids'][0])
# CancelPendingOrders
while True:
Sleep(Interval)
orders = _C(e.GetOrders)
if len(orders) == 0:
break
for j in range(len(orders)):
e.CancelOrder(orders[j]['Id'])
if j < (len(orders) - 1):
Sleep(Interval)
ret = {
"price": 0,
"amount": 0,
"position": positionNow
}
if positionNow is None:
return ret
if initPosition is None:
ret['price'] = positionNow['Price']
ret['amount'] = positionNow['Amount']
else:
ret['amount'] = positionNow['Amount'] - initPosition['Amount']
ret['price'] = _N(((positionNow['Price'] * positionNow['Amount']) - (initPosition['Price'] * initPosition['Amount'])) / ret['amount'])
return ret
def Cover(e, contractType):
insDetail = _C(e.SetContractType, contractType)
# if (insDetail.MaxLimitOrderVolume == 0)
if insDetail["MaxLimitOrderVolume"] == 0:
# insDetail.MaxLimitOrderVolume = 50
insDetail["MaxLimitOrderVolume"] = 50
while True:
n = 0
positions = _C(e.GetPosition)
for i in range(len(positions)):
if positions[i]['ContractType'] != contractType:
continue
amount = min(insDetail['MaxLimitOrderVolume'], positions[i]['Amount'])
depth = None
if (positions[i]['Type'] == PD_LONG) or (positions[i]['Type'] == PD_LONG_YD):
depth = _C(e.GetDepth)
e.SetDirection("closebuy_today" if positions[i]['Type'] == PD_LONG else "closebuy")
e.Sell(depth['Bids'][0]['Price'] - (insDetail['PriceTick'] * SlideTick), min(amount, depth['Bids'][0]['Amount']), contractType, "平今" if positions[i]['Type'] == PD_LONG else "平昨", 'Bid', depth['Bids'][0])
n += 1
elif (positions[i]['Type'] == PD_SHORT) or (positions[i]['Type'] == PD_SHORT_YD):
depth = _C(e.GetDepth)
e.SetDirection("closesell_today" if positions[i]['Type'] == PD_SHORT else "closesell")
e.Buy(depth['Asks'][0]['Price'] + (insDetail['PriceTick'] * SlideTick), min(amount, depth['Asks'][0]['Amount']), contractType, "平今" if positions[i]['Type'] == PD_SHORT else "平昨", 'Asks', depth['Asks'][0])
n += 1
if n == 0:
break
while True:
Sleep(Interval)
orders = _C(e.GetOrders)
if len(orders) == 0:
break
for j in range(len(orders)):
e.CancelOrder(orders[j]['Id'])
if j < (len(orders) - 1):
Sleep(Interval)
trans = {
"AccountID": "投资者帐号",
"Available": "可用资金",
"Balance": "期货结算准备金",
"BrokerID": "经纪公司代码",
"CashIn": "资金差额",
"CloseProfit": "平仓盈亏",
"Commission": "手续费",
"Credit": "信用额度",
"CurrMargin": "当前保证金总额",
"CurrencyID": "币种代码",
"DeliveryMargin": "投资者交割保证金",
"Deposit": "入金金额",
"ExchangeDeliveryMargin": "交易所交割保证金",
"ExchangeMargin": "交易所保证金",
"FrozenCash": "冻结的资金",
"FrozenCommission": "冻结的手续费",
"FrozenMargin": "冻结的保证金",
"FundMortgageAvailable": "货币质押余额",
"FundMortgageIn": "货币质入金额",
"FundMortgageOut": "货币质出金额",
"Interest": "利息收入",
"InterestBase": "利息基数",
"Mortgage": "质押金额",
"MortgageableFund": "可质押货币金额",
"PositionProfit": "持仓盈亏",
"PreBalance": "上次结算准备金",
"PreCredit": "上次信用额度",
"PreDeposit": "上次存款额",
"PreFundMortgageIn": "上次货币质入金额",
"PreFundMortgageOut": "上次货币质出金额",
"PreMargin": "上次占用的保证金",
"PreMortgage": "上次质押金额",
"Reserve": "基本准备金",
"ReserveBalance": "保底期货结算准备金",
"SettlementID": "结算编号",
"SpecProductCloseProfit": "特殊产品持仓盈亏",
"SpecProductCommission": "特殊产品手续费",
"SpecProductExchangeMargin": "特殊产品交易所保证金",
"SpecProductFrozenCommission": "特殊产品冻结手续费",
"SpecProductFrozenMargin": "特殊产品冻结保证金",
"SpecProductMargin": "特殊产品占用保证金",
"SpecProductPositionProfit": "特殊产品持仓盈亏",
"SpecProductPositionProfitByAlg": "根据持仓盈亏算法计算的特殊产品持仓盈亏",
"TradingDay": "交易日",
"Withdraw": "出金金额",
"WithdrawQuota": "可取资金",
}
def AccountToTable(Str, title = '账户的信息'):
global trans
if (not title in dir()) or (not title):
title = '账户信息'
tbl = {'type': "table", 'title': title, 'cols': ["字段", "描述", "值"], 'rows': []}
try:
fields = json.loads(Str)
for k in fields:
if k == 'AccountID' or k == 'BrokerID':
continue
if k not in trans:
desc = '--'
else:
desc = trans[k]
v = fields[k]
if type(v) == typeOfstr('int') or type(v) == typeOfstr('float'):
v = _N(v, 5)
tbl['rows'].append([k, desc, v])
except:
Log(traceback.format_exc())
return tbl
# NewTaskQueue 类
class NewTaskQueue:
'模拟并发任务队列类'
NewTaskQueueCount = 0
def __init__(self, onTaskFinish = None):
self.ERR_SUCCESS = 0
self.ERR_SET_SYMBOL = 1
self.ERR_GET_RECORDS = 2
self.ERR_GET_ORDERS = 3
self.ERR_GET_POS = 4
self.ERR_TRADE = 5
self.ERR_GET_DEPTH = 6
self.ERR_NOT_TRADEING = 7
self.ERR_BUSY = 8
self.onTaskFinish = None if onTaskFinish is None else onTaskFinish
self.retryInterval = 300
self.tasks = []
def pushTask(self, e, symbol, action, amount, onFinish):
task = {
"e" : e,
"action" : action,
"symbol" : symbol,
"amount" : amount,
"init" : False,
"finished" : False,
"dealAmount" : 0,
"preAmount" : 0,
"preCost" : 0,
"retry" : 0,
"maxRetry" : 10,
"onFinish" : onFinish,
"desc" : ""
}
# 暂时不用字典映射
if task["action"] == "buy":
task["desc"] = task["symbol"] + " 开多仓,数量" + str(task["amount"])
elif task["action"] == "sell":
task["desc"] = task["symbol"] + " 开空仓,数量" + str(task["amount"])
elif task["action"] == "closebuy":
task["desc"] = task["symbol"] + " 平多仓,数量" + str(task["amount"])
elif task["action"] == "closesell":
task["desc"] = task["symbol"] + " 平空仓,数量" + str(task["amount"])
else:
task["desc"] = task["symbol"] + " " + task["action"] + ", 数量 " + str(task["amount"])
self.tasks.append(task)
Log("接收到任务", task["desc"])
def cancelAll(self, e):
while True:
orders = e.GetOrders()
if orders is None:
return self.ERR_GET_ORDERS
if len(orders) == 0:
break
for i in range(len(orders)):
e.CancelOrder(orders[i]["Id"])
Sleep(self.retryInterval)
return self.ERR_SUCCESS
def pollTask(self, task):
insDetail = task["e"].SetContractType(task["symbol"])
if insDetail is None:
return self.ERR_SET_SYMBOL
# if (insDetail.MaxLimitOrderVolume == 0)
if insDetail["MaxLimitOrderVolume"] == 0:
# insDetail.MaxLimitOrderVolume = 50
insDetail["MaxLimitOrderVolume"] = 50
ret = False
isCover = (task["action"] != "buy") and (task["action"] != "sell")
while True:
if not ext.IsTrading(task["symbol"]):
return self.ERR_NOT_TRADEING
Sleep(500)
ret = self.cancelAll(task["e"])
if ret != self.ERR_SUCCESS:
return ret
positions = task["e"].GetPosition()
if positions is None:
return self.ERR_GET_POS
pos = None
for i in range(len(positions)):
if (positions[i]["ContractType"] == task["symbol"] and (((positions[i]["Type"] == PD_LONG or positions[i]["Type"] == PD_LONG_YD) and (task["action"] == "buy" or task["action"] == "closebuy")) or ((positions[i]["Type"] == PD_SHORT or positions[i]["Type"] == PD_SHORT_YD) and (task["action"] == "sell" or task["action"] == "closesell")))):
if pos is None:
pos = positions[i]
pos["Cost"] = positions[i]["Price"] * positions[i]["Amount"]
else:
pos["Amount"] += positions[i]["Amount"]
pos["Profit"] += positions[i]["Profit"]
pos["Cost"] += positions[i]["Price"] * positions[i]["Amount"]
if not task["init"]:
task["init"] = True
if pos:
task["preAmount"] = pos["Amount"]
task["preCost"] = pos["Cost"]
else:
task["preAmount"] = 0
task["preCost"] = 0
if isCover:
Log("找不到仓位", task["symbol"], task["action"])
ret = None
break
remain = task["amount"]
if isCover and (pos is None):
pos = {"Amount": 0, "Cost": 0, "Price": 0}
if pos:
task["dealAmount"] = pos["Amount"] - task["preAmount"]
if isCover:
task["dealAmount"] = -task["dealAmount"]
remain = task["amount"] - task["dealAmount"]
if (remain <= 0 or task["retry"] >= task["maxRetry"]):
ret = {
"price" : (0 if task["dealAmount"] == 0 else (pos["Cost"] - task["preCost"]) / (pos["Amount"] - task["preAmount"])),
"amount" : (pos["Amount"] - task["preAmount"]),
"position" : pos
}
if isCover:
ret["amount"] = -ret["amount"]
if pos["Amount"] == 0:
ret["position"] = None
break
elif task["retry"] >= task["maxRetry"]:
ret = None
break
depth = task["e"].GetDepth()
if depth is None:
return self.ERR_GET_DEPTH
orderId = None
slidePrice = insDetail["PriceTick"] * SlideTick
if isCover:
for i in range(len(positions)):
if positions[i]["ContractType"] != task["symbol"]:
continue
if (int(remain) < 1):
break
amount = min(insDetail["MaxLimitOrderVolume"], positions[i]["Amount"], remain)
if (task["action"] == "closebuy" and (positions[i]["Type"] == PD_LONG or positions[i]["Type"] == PD_LONG_YD)):
task["e"].SetDirection("closebuy_today" if positions[i]["Type"] == PD_LONG else "closebuy")
amount = min(amount, depth["Bids"][0]["Amount"])
orderId = task["e"].Sell(_N(depth["Bids"][0]["Price"] - slidePrice, 2), amount, task["symbol"], "平今" if positions[i]["Type"] == PD_LONG else "平昨", "Bid", depth["Bids"][0])
remain -= amount
elif (task["action"] == "closesell" and (positions[i]["Type"] == PD_SHORT or positions[i]["Type"] == PD_SHORT_YD)):
task["e"].SetDirection("closesell_today" if positions[i]["Type"] == PD_SHORT else "closesell")
amount = min(amount, depth["Asks"][0]["Amount"])
orderId = task["e"].Buy(_N(depth["Asks"][0]["Price"] + slidePrice, 2), amount, task["symbol"], "平今" if positions[i]["Type"] == PD_SHORT else "平昨", "Ask", depth["Asks"][0])
remain -= amount
else:
if task["action"] == "buy":
task["e"].SetDirection("buy")
orderId = task["e"].Buy(_N(depth["Asks"][0]["Price"] + slidePrice, 2), min(remain, depth["Asks"][0]["Amount"]), task["symbol"], "Ask", depth["Asks"][0])
else :
task["e"].SetDirection("sell")
orderId = task["e"].Sell(_N(depth["Bids"][0]["Price"] - slidePrice, 2), min(remain, depth["Bids"][0]["Amount"]), task["symbol"], "Bid", depth["Bids"][0])
if orderId is None:
task["retry"] += 1
return self.ERR_TRADE
task["finished"] = True
if self.onTaskFinish:
self.onTaskFinish(task, ret)
if task["onFinish"]:
task["onFinish"](task, ret)
return self.ERR_SUCCESS
def poll(self):
processed = 0
# 迭代
for task in self.tasks :
if not task["finished"] :
processed += 1
self.pollTask(task)
if processed == 0:
self.tasks = []
def size(self):
return len(self.tasks)
'''
self.GetPosition = function(e, contractType, direction, positions) {
return GetPosition(e, contractType, direction, positions);
};
'''
def GetPosition(self, e, contractType, direction=None, positions=None):
return GetPosition(e, contractType, direction, positions)
'''
self.hasTask = function(symbol) {
if (typeof(symbol) !== 'string') {
return self.tasks.length > 0
}
for (var i = 0; i < self.tasks.length; i++) {
if (self.tasks[i].symbol == symbol && !self.tasks[i].finished) {
return true
}
}
return false
}
'''
def hasTask(self, symbol):
if type(symbol) != str:
return len(self.tasks) > 0
for task in self.tasks:
if task["symbol"] == symbol and not task["finished"]:
return True
return False
# NewPositionManager 类
class NewPositionManager:
'非并发交易控制类'
NewPositionManagerCount = 0
# 构造函数
def __init__(self, e):
self.e = e
self.account = None
# Account
def Account(self):
if self.account is None:
self.account = _C(self.e.GetAccount)
return self.account
# GetAccount
def GetAccount(self, getTable = False):
self.account = _C(self.e.GetAccount)
if (not getTable in dir() and getTable):
return AccountToTable(self.e.GetRawJSON())
return self.account
# GetPosition
def GetPosition(self, contractType, direction, positions = None):
return GetPosition(self.e, contractType, direction, positions)
# OpenLong
def OpenLong(self, contractType, shares):
if self.account is None:
self.account = _C(self.e.GetAccount)
return Open(self.e, contractType, PD_LONG, shares)
# OpenShort
def OpenShort(self, contractType, shares):
if self.account is None:
self.account = _C(self.e.GetAccount)
return Open(self.e, contractType, PD_SHORT, shares)
# Cover
def Cover(self, contractType):
if self.account is None:
self.account = _C(self.e.GetAccount)
return Cover(self.e, contractType)
# CoverAll
def CoverAll(self):
if self.account is None:
self.account = _C(self.e.GetAccount)
while True:
positions = _C(self.e.GetPosition)
if len(positions) == 0:
break
for i in range(len(positions)):
if '&' not in positions[i]['ContractType']:
Cover(self.e, positions[i]['ContractType'])
Sleep(1000)
# Profit
def Profit(self): # contractType JS 版本有该参数
accountNow = _C(self.e.GetAccount)
return _N(accountNow.Balance - self.account.Balance)
# NewPositionManager END
# 导出函数实现
def CreateNewPositionManager(e = exchange): # 导出函数实现
if e not in exchanges:
raise Exception("error exchange", e)
if (versionMainValue != 3 and e.GetName() != 'Futures_CTP') or (versionMainValue == 3 and e.GetName() != 'Futures_CTP'):
raise Exception("error exchange, 本模板适用于CTP商品期货,当前添加的交易所为:", e.GetName());
obj_NewPositionManager = NewPositionManager(e)
return obj_NewPositionManager
def IsTrading(symbol):
now = time.time()
tup_localtime = time.localtime(now)
day = tup_localtime.tm_wday # tm_wday : week 0~6 , 0 is monday
hour = tup_localtime.tm_hour # tm_hour : 0~23
minute = tup_localtime.tm_min # tm_min : 0~59
if (day == 6 or (day == 5 and (hour > 2 or hour == 2 and minute > 30))):
return False
shortName = "" # i , p
for i in range(len(symbol)):
ch = symbol[i]
if ch.isdigit(): # ch >= 48 and ch <= 57:
break
shortName += symbol[i].upper()
period = [
[9, 0, 10, 15],
[10, 30, 11, 30],
[13, 30, 15, 0]
]
if (shortName == "IH" or shortName == "IF" or shortName == "IC"):
period = [
[9, 30, 11, 30],
[13, 0, 15, 0]
]
elif (shortName == "TF" or shortName == "T" or shortName == "TS"):
period = [
[9, 30, 11, 30],
[13, 0, 15, 15]
]
if day >= 0 and day <= 4:
for i in range(len(period)):
p = period[i]
if ((hour > p[0] or (hour == p[0] and minute >= p[1])) and (hour < p[2] or (hour == p[2] and minute < p[3]))):
return True
''' 参考JavaScript版本合约品种交易时间
var nperiod = [
[
['sc', 'ag', 'au'],
[21, 0, 2, 30]
],
[
['nr', 'bu', 'hc', 'rb', 'ru', 'sp', 'fu', 'ZC', 'CF', 'CY', 'FG', 'MA', 'PF', 'OI', 'RM', 'SR',
'TA', 'SA', 'a', 'b', 'c', 'cs', 'i', 'j', 'jm', 'l', 'm', 'p', 'pp', 'v', 'y', 'eg', 'rr', 'eb', 'pg', 'lu',
'PX', 'br', 'PR', 'SH'
],
[21, 0, 23, 0]
],
[
['al', 'cu', 'ni', 'pb', 'sn', 'zn', 'ss', 'bc', 'ao'],
[21, 0, 1, 0]
]
];
'''
nperiod = [
[
['SC', 'AU', 'AG'],
[21, 0, 2, 30] # 此处修改为 2
],
[
['CU', 'AL', 'ZN', 'PB', 'SN', 'NI', 'SS', 'BC', 'AO'],
[21, 0, 1, 0] # 此处修改为 1
],
[
['NR', 'BU', 'HC', 'RB', 'RU', 'SP', 'FU', 'ZC', 'CF', 'CY', 'FG', 'MA', 'PF', 'OI', 'RM', 'SR', 'TA', 'SA', 'A', 'B', 'C', 'CS', 'I', 'J', 'JM', 'L', 'M', 'P', 'PP', 'V', 'Y', 'EG', 'RR', 'EB', 'PG', 'LU', 'PX', 'BR', 'PR', 'SH'],
[21, 0, 23, 0]
]
]
for i in range(len(nperiod)):
for j in range(len(nperiod[i][0])):
if nperiod[i][0][j] == shortName:
p = nperiod[i][1]
condA = hour > p[0] or (hour == p[0] and minute >= p[1])
condB = hour < p[2] or (hour == p[2] and minute < p[3])
# in one day
if p[2] >= p[0]:
if ((day >= 0 and day <= 4) and condA and condB):
return True
else:
if (((day >= 0 and day <= 4) and condA) or ((day >= 1 and day <= 5) and condB)):
return True
return False
return False
def CreateNewTaskQueue(onTaskFinish = None):
obj_NewTaskQueue = NewTaskQueue(onTaskFinish)
return obj_NewTaskQueue
# 合约名称转换产品名称
def ins2product(symbol):
if len(symbol) > 6 and symbol.find(' ') == -1:
return symbol
symbol = symbol.replace("SPD ", "").replace("SP ", "")
shortName = ""
for ch in symbol:
if ch >= chr(48) and ch <= chr(57):
break
shortName += ch
return shortName
# CTA函数
def CTA(contractType, onTick, interval = 500):
SetErrorFilter("login|ready|初始化") # login|ready|初始化 过滤三种错误
exchange.IO("mode", 0)
lastUpdate = 0
e = exchange
symbols = contractType.split(",")
holds = {}
tblAccount = {}
def findChartSymbol(ct):
product = ins2product(ct)
for i in symbols:
tmp = i.split("/")
if ins2product(tmp[-1]) == product:
return tmp[0]
return None
def refreshHold(qSize=0):
while not e.IO("status"):
LogStatus(_D(), "Not connect")
Sleep(1000)
for ins in symbols:
tmp = ins.split("/")
if len(tmp) == 2:
holds[tmp[0]] = {
"price" : 0,
"value" : 0,
"amount" : 0,
"profit" : 0,
"symbol" : tmp[1]
}
else :
holds[ins] = {
"price" : 0,
"value" : 0,
"amount" : 0,
"profit" : 0,
"symbol" : ins,
}
positions = _C(e.GetPosition)
for pos in positions:
mapCT = findChartSymbol(pos["ContractType"])
if not mapCT:
continue
if mapCT not in holds:
continue
hold = holds[mapCT]
if pos["Type"] == PD_LONG or pos["Type"] == PD_LONG_YD:
if hold["amount"] < 0 and qSize == 0:
raise "不能同时持有多仓空仓"
hold["amount"] += pos["Amount"]
else :
if hold["amount"] > 0 and qSize == 0:
raise "不能同时持有多仓空仓"
hold["amount"] -= pos["Amount"]
hold["value"] += pos["Price"] * pos["Amount"]
hold["profit"] += pos["Profit"]
if hold["amount"] != 0:
hold["price"] = _N(hold["value"] / abs(hold["amount"]))
account = _C(e.GetAccount)
if CTAShowPosition:
tblPosition = {
"type" : "table",
"title" : "持仓状态",
"cols" : ["品种", "方向", "均价", "数量", "浮动盈亏"],
"rows" : [],
}
for pos in positions:
# tblPosition.rows.push([pos.ContractType, ((pos.Type == PD_LONG || pos.Type == PD_LONG_YD) ? '多#0000ff' : '空#ff0000'), pos.Price, pos.Amount, pos.Profit])
tblPosition["rows"].append([pos["ContractType"], "多#0000ff" if (pos["Type"] == PD_LONG or pos["Type"] == PD_LONG_YD) else "空#ff0000", pos["Price"], pos["Amount"], pos["Profit"]])
tblAccount = ext.AccountToTable(e.GetRawJSON(), "资金信息")
LogStatus("`" + json.dumps([tblPosition, tblAccount]) + "`\n", "更新于:" + _D())
lastUpdate = time.time() * 1000
return account
account = refreshHold(0)
def callBack_NewTaskQueue(task, ret):
Log("任务结束", task["desc"])
account = refreshHold(q.size())
q = ext.NewTaskQueue(callBack_NewTaskQueue)
mainCache = {}
while True:
ts = time.time() * 1000
for ins in symbols:
ctChart = ins
ctTrade = ins
tmp = ins.split("/")
if len(tmp) == 2:
ctChart = tmp[0]
ctTrade = tmp[1]
if not e.IO("status") or not ext.IsTrading(ctChart) or not ext.IsTrading(ctTrade) or q.hasTask(ctTrade):
continue
# 正在移仓
if ctTrade in mainCache and (q.hasTask(mainCache[ctTrade][0]) or q.hasTask(mainCache[ctTrade][1])):
continue
c = e.SetContractType(ctChart)
if not c:
continue
r = e.GetRecords()
if r is None or len(r) == 0:
continue
insDetail = e.SetContractType(ctTrade)
if not insDetail:
continue
tradeSymbol = insDetail["InstrumentID"]
if ctTrade.find("888") != -1 or ctTrade.find("000") != -1:
preMain = ""
isSwitch = False
positions = None
if ctTrade not in mainCache:
if not IsVirtual():
Log(ctTrade, "当前主力合约为:", tradeSymbol)
positions = e.GetPosition()
if positions is None:
continue
product = ins2product(ctTrade)
for p in positions:
if ins2product(p["ContractType"]) == product:
mainCache[ctTrade] = [p["ContractType"], p["ContractType"]]
if ctTrade in mainCache and mainCache[ctTrade][0] != tradeSymbol:
preMain = mainCache[ctTrade][0]
Log(ctTrade, "主力合约切换为:", tradeSymbol, "之前为:", preMain, "#ff0000")
if positions is None:
positions = e.GetPosition()
if positions is None:
continue
for p in positions:
if p["ContractType"] == preMain:
isLong = p["Type"] == PD_LONG or p["Type"] == PD_LONG_YD
def callBack_pushTaskCover(task, ret):
Log("切换合约平仓成功", task["desc"], ret)
def callBack_pushTaskOpen(task, ret):
Log("切换合约开仓成功", task["desc"], ret)
q.pushTask(e, p["ContractType"], "closebuy" if isLong else "closesell", p["Amount"], callBack_pushTaskCover)
q.pushTask(e, tradeSymbol, "buy" if isLong else "sell", p["Amount"], callBack_pushTaskOpen)
isSwitch = True
mainCache[ctTrade] = [tradeSymbol, preMain]
if isSwitch:
Log("开始移仓", preMain, "移到", tradeSymbol)
continue
hold = holds[ctChart]
n = onTick({
"records" : r,
"symbol" : tradeSymbol,
"detail" : insDetail,
"account" : account,
"position" : hold,
"positions" : holds,
})
callBack = None
if type(n) == typeOfstr("list") and type(len(n)) == typeOfstr("int") and len(n) > 1:
if type(n[1]) == types.FunctionType:
callBack = n[1]
n = n[0]
if type(n) != typeOfstr("int") and type(n) != typeOfstr("float"):
continue
ret = None
if n > 0:
if hold["amount"] < 0:
q.pushTask(e, tradeSymbol, "closesell", min(-hold["amount"], n), callBack)
n += hold["amount"]
if n > 0:
q.pushTask(e, tradeSymbol, "buy", n, callBack)
elif n < 0:
if hold["amount"] > 0:
q.pushTask(e, tradeSymbol, "closebuy", min(hold["amount"], -n), callBack)
n += hold["amount"]
if n < 0:
q.pushTask(e, tradeSymbol, "sell", -n, callBack)
elif n == 0 and hold["amount"] != 0:
q.pushTask(e, tradeSymbol, "closebuy" if hold["amount"] > 0 else "closesell", abs(hold["amount"]), callBack)
q.poll()
now = time.time() * 1000
if now - lastUpdate > SyncInterval * 1000:
account = refreshHold(q.size())
delay = interval - (now - ts)
if delay > 0:
Sleep(delay)
def Cross(array1, array2):
if len(array1) != len(array2):
raise "array length not equal"
arr1 = array1.copy()
arr2 = array2.copy()
n = 0
arr1.reverse()
arr2.reverse()
for i in range(len(arr1)):
if arr1[i] is None or arr2[i] is None:
break
if arr1[i] < arr2[i]:
if n > 0:
break
n -= 1
elif arr1[i] > arr2[i]:
if n < 0:
break
n += 1
else :
break
return n
# 导出函数
ext.NewPositionManager = CreateNewPositionManager
ext.IsTrading = IsTrading
ext.AccountToTable = AccountToTable
ext.NewTaskQueue = CreateNewTaskQueue
ext.CTA = CTA
ext.Cross = Cross
# 测试 IsVirtual() 判断是否是回测。
def VGetRawJSON(): # 模拟 GetRawJSON 函数 ,仅测试使用。
nowTime = time.time()
DnowTime = _D(nowTime)
dict1 = {"AccountID": "073997", "Available": 1331.445464656656, "Balance": 1331.3344567, "BrokerID": "9999", "time": nowTime, "_D": DnowTime, "CurrMargin": 0}
dict1Str = json.dumps(dict1)
return dict1Str
def main():
'''
# 测试 AccountToTable
Log("测试 AccountToTable 函数 ")
Str = '{"AccountID": "073997", "Available": 1331.445464656656, "Balance": 1331.3344567, "BrokerID": "9999", "CashIn": 0, "XXX": "dd"}'
table = AccountToTable(Str)
Log(json.dumps(table))
LogStatus('`' + json.dumps(table) + '`')
# 测试 IsTrading
Log("now time", _D(), "isTrading('MA701'): ", ext.IsTrading("MA701"))
Log("now time", _D(), "isTrading('SR701'): ", ext.IsTrading("SR701"))
Log("now time", _D(), "isTrading('jd1701'): ", ext.IsTrading("jd1701"))
# 测试 NewPositionManager 导出函数 生成对象
obj = ext.NewPositionManager()
Log(obj.Account(), obj.account)
# 测试 NewPositionManager 类 实例的成员函数 GetAccount
retGetAccount = obj.GetAccount(True)
Log(retGetAccount)
LogStatus('`' + json.dumps(retGetAccount) + '`')
# 测试 OpenLong 、 OpenShort 、GetPosition
open_long = obj.OpenLong("MA701", 2)
open_short1 = obj.OpenShort("SR701", 3)
open_short2 = obj.OpenShort("jd1701", 4)
Log("open_long:", open_long)
Log("open_short1:", open_short1)
Log("open_short2:", open_short2)
positions = obj.GetPosition("MA701", PD_SHORT)
Log("get MA701 PD_SHORT:", positions)
positions = obj.GetPosition("MA701", PD_LONG)
Log("get MA701 PD_LONG:", positions)
# 测试 CoverAll
obj.Cover("MA701")
# 测试 CoverAll
obj.CoverAll()
# 读取 Cover 、CoverAll 后的 持仓信息
positions = exchange.GetPosition()
Log("now Positions:", positions)
# 测试 GetAccount 、 Profit
# Log("Account:", obj.GetAccount())
Log("Profit:", obj.Profit())
Log("account", obj.Account())
Log("Account:", obj.GetAccount())
# 测试 IsTrading
Log(ext.IsTrading("MA701"))
# 测试 ext.AccountToTable
Str = '{"AccountID": "000", "Available": 55, "Balance": 55, "BrokerID": "9999", "CashIn": 0, "ZZZ": "YHU"}'
table = ext.AccountToTable(Str)
Log(json.dumps(table))
LogStatus('`' + json.dumps(table) + '`')
Log("-------------------------------------------------------------")
'''
# 测试 NewTaskQueue
'''
q = ext.NewTaskQueue()
def NoName2(task, ret):
Log(task["desc"], ret, "#FF0000")
def NoName1(task, ret):
Log(task["desc"], ret, "#FF0000")
if ret:
q.pushTask(exchange, "MA701", "closebuy", 1, NoName2)
q.pushTask(exchange, "MA701", "buy", 3, lambda task, ret: Log(task["desc"], ret, q.pushTask(exchange, "MA701", "closebuy", 1, lambda task, ret: Log(task["desc"], ret, "#FF0000")) if ret else "", "#FF0000"))
Log("q.tasks`s length :", q.size())
while True:
q.poll()
Sleep(1000)
'''
# 测试CTA函数
def callBack_CTA(st):
if len(st["records"]) < 20:
return
emaSlow = TA.EMA(st["records"], 20)
emaFast = TA.EMA(st["records"], 5)
cross = ext.Cross(emaFast, emaSlow)
if st["position"]["amount"] <= 0 and cross > 2:
Log("金叉周期", cross, "当前持仓:", st["position"])
return 2 if st["position"]["amount"] < 0 else 1
elif st["position"]["amount"] >= 0 and cross < -2:
Log("死叉周期", cross, "当前持仓:", st["position"])
return -2 if st["position"]["amount"] > 0 else -1
ret = ext.CTA("MA000/MA888,rb000/rb888,i000/i888", callBack_CTA)