资源加载中... loading...

python版CTP商品期货交易类库(支持CTA函数测试版)

Author: 雨幕(youquant), Date: 2020-10-19 10:19:51
Tags:

python版CTP商品期货交易类库

测试版 如有BUG 欢迎提出。

  • 1、2017.4.25 更新: 增加if (insDetail.MaxLimitOrderVolume == 0)条件判断,有些期货公司服务器会返回0值,特此处理。共修改3处 【1】self.pollTask 【2】function Cover 【3】function Open

  • 2、2020.10.20 更新: 增加CTA函数支持,逻辑移植自JavaScript版本商品期货交易类库CTA函数。 调用例子:

    def callBack_CTA(st):
        if len(st["records"]) < 20:
            return 
        emaSlow = TA.EMA(st["records"], 20)
        emaFast = TA.EMA(st["records"], 5)
        cross = ext.Cross(emaFast, emaSlow)
        
        if st["position"]["amount"] <= 0 and cross > 2:
            Log("金叉周期", cross, "当前持仓:", st["position"])
            return 2 if st["position"]["amount"] < 0 else 1
        elif st["position"]["amount"] >= 0 and cross < -2:
            Log("死叉周期", cross, "当前持仓:", st["position"])
            return -2 if st["position"]["amount"] > 0 else -1  
    
    ret = ext.CTA("MA000/MA888,rb000/rb888,i000/i888", callBack_CTA)
    
  • 3、2021.07.14 更新:

    if pos["Type"] == PD_LONG or pos["Type"] == PD_LONG_YD:
    

    笔误PD_LONG_YD写成了PD_SHORT_YD

  • 4、2024.09.20 增加新品种夜盘代码: ‘AO’, ‘PX’, ‘BR’, ‘PR’, ‘SH’


import json       # json 模块
import types      # 类型  模块
import platform   # 版本信息
import traceback  # 用于异常处理
# str() :  ASCII and UTF-8
import time
# import sys
# reload(sys)
# sys.setdefaultencoding('utf8')

versionMainValue = None  # 记录python 版本信息
isFirstCheck = True      # 记录 是否是第一次检查

def CheckVersion():      # 检查python 版本
    global versionMainValue,isFirstCheck
    if(isFirstCheck == True):
        platformInfo = platform.python_version()
        if platformInfo[0] == '2':
            Log("您使用的托管者 python编译环境的python版本是",platformInfo)
            versionMainValue = 2
            import sys
            reload(sys)
            sys.setdefaultencoding('utf8')
            Log("import sys, reload(sys), sys.setdefaultencoding('utf8')")
        elif platformInfo[0] == '3':
            Log("您使用的托管者 python编译环境的python版本是",platformInfo)
            versionMainValue = 3
        else:
            Log("其它版本")
    isFirstCheck = False

def typeOfstr(str):
    if str == "list":
        if versionMainValue == 2:
            return types.ListType
        elif versionMainValue == 3:
            return list
    elif str == "int":
        if versionMainValue == 2:
            return types.IntType
        elif versionMainValue == 3:
            return int
    elif str == "float":
        if versionMainValue == 2:
            return types.FloatType
        elif versionMainValue == 3:
            return float
    else:
        Log("error , typeOfstr used false")


def init():
    if not 'SlideTick' in locals().keys():
        SlideTick = 1
    else:
        SlideTick = int(SlideTick)
    
    CheckVersion()   # 检查python 版本
    
    if IsVirtual():
        exchange.GetRawJSON = VGetRawJSON
        Log("回测系统中运行,已重写GetRawJSON。")
    
    Log("商品期货交易类库加载成功")

def GetPosition(e, contractType, direction, positions = None):
    allCost = 0
    allAmount = 0
    allProfit = 0
    allFrozen = 0
    posMargin = 0
    if (not positions in dir()) or (not positions):
        positions = _C(e.GetPosition)
    for i in range(len(positions)):
        if (positions[i]['ContractType'] == contractType and (((positions[i]['Type'] == PD_LONG or positions[i]['Type'] == PD_LONG_YD) and direction == PD_LONG) or ((positions[i]['Type'] == PD_SHORT or positions[i]['Type'] == PD_SHORT_YD) and direction == PD_SHORT))):
            posMargin = positions[i]['MarginLevel']
            allCost += positions[i]['Price'] * positions[i]['Amount']
            allAmount += positions[i]['Amount']
            allProfit += positions[i]['Profit']
            allFrozen += positions[i]['FrozenAmount']
    if allAmount == 0:
        return 
    return {
        "MarginLevel": posMargin,
        "FrozenAmount": allFrozen,
        "Price": _N(allCost / allAmount),
        "Amount": allAmount,
        "Profit": allProfit,
        "Type": direction,
        "ContractType": contractType
    }

def Open(e, contractType, direction, opAmount):
    initPosition = GetPosition(e, contractType, direction)
    isFirst = True
    initAmount = initPosition['Amount'] if initPosition else 0
    positionNow = initPosition
    while True:
        needOpen = opAmount
        if isFirst:
            isFirst = False
        else:
            positionNow = GetPosition(e, contractType, direction)
            if positionNow:
                needOpen = opAmount - (positionNow['Amount'] - initAmount)
        insDetail = _C(e.SetContractType, contractType)
        # if (insDetail.MaxLimitOrderVolume == 0)
        if insDetail["MaxLimitOrderVolume"] == 0:
            # insDetail.MaxLimitOrderVolume = 50
            insDetail["MaxLimitOrderVolume"] = 50
        if needOpen < insDetail['MinLimitOrderVolume']:
            break
        depth = _C(e.GetDepth)
        amount = min(insDetail['MaxLimitOrderVolume'], needOpen)
        e.SetDirection("buy" if direction == PD_LONG else "sell")
        orderId = None
        if direction == PD_LONG:
            orderId = e.Buy(depth['Asks'][0]['Price'] + (insDetail['PriceTick'] * SlideTick), min(amount, depth['Asks'][0]['Amount']), contractType, 'Ask', depth['Asks'][0])
        else:
            orderId = e.Sell(depth['Bids'][0]['Price'] - (insDetail['PriceTick'] * SlideTick), min(amount, depth['Bids'][0]['Amount']), contractType, 'Bid', depth['Bids'][0])
        # CancelPendingOrders
        while True:
            Sleep(Interval)
            orders = _C(e.GetOrders)
            if len(orders) == 0:
                break
            for j in range(len(orders)):
                e.CancelOrder(orders[j]['Id'])
                if j < (len(orders) - 1):
                    Sleep(Interval)
    ret = {
        "price": 0,
        "amount": 0,
        "position": positionNow
    }
    if positionNow is None:
        return ret
    if initPosition is None:
        ret['price'] = positionNow['Price']
        ret['amount'] = positionNow['Amount']
    else:
        ret['amount'] = positionNow['Amount'] - initPosition['Amount']
        ret['price'] = _N(((positionNow['Price'] * positionNow['Amount']) - (initPosition['Price'] * initPosition['Amount'])) / ret['amount'])
    return ret

def Cover(e, contractType):
    insDetail = _C(e.SetContractType, contractType)
    # if (insDetail.MaxLimitOrderVolume == 0)
    if insDetail["MaxLimitOrderVolume"] == 0:
        # insDetail.MaxLimitOrderVolume = 50
        insDetail["MaxLimitOrderVolume"] = 50
    while True:
        n = 0
        positions = _C(e.GetPosition)
        for i in range(len(positions)):
            if positions[i]['ContractType'] != contractType:
                continue
            amount = min(insDetail['MaxLimitOrderVolume'], positions[i]['Amount'])
            depth = None
            if (positions[i]['Type'] == PD_LONG) or (positions[i]['Type'] == PD_LONG_YD):
                depth = _C(e.GetDepth)
                e.SetDirection("closebuy_today" if positions[i]['Type'] == PD_LONG else "closebuy")
                e.Sell(depth['Bids'][0]['Price'] - (insDetail['PriceTick'] * SlideTick), min(amount, depth['Bids'][0]['Amount']), contractType, "平今" if positions[i]['Type'] == PD_LONG else "平昨", 'Bid', depth['Bids'][0])
                n += 1
            elif (positions[i]['Type'] == PD_SHORT) or (positions[i]['Type'] == PD_SHORT_YD):
                depth = _C(e.GetDepth)
                e.SetDirection("closesell_today" if positions[i]['Type'] == PD_SHORT else "closesell")
                e.Buy(depth['Asks'][0]['Price'] + (insDetail['PriceTick'] * SlideTick), min(amount, depth['Asks'][0]['Amount']), contractType, "平今" if positions[i]['Type'] == PD_SHORT else "平昨", 'Asks', depth['Asks'][0])
                n += 1
        if n == 0:
            break
        while True:
            Sleep(Interval)
            orders = _C(e.GetOrders)
            if len(orders) == 0:
                break
            for j in range(len(orders)):
                e.CancelOrder(orders[j]['Id'])
                if j < (len(orders) - 1):
                    Sleep(Interval)


trans = {
    "AccountID": "投资者帐号",
    "Available": "可用资金",
    "Balance": "期货结算准备金",
    "BrokerID": "经纪公司代码",
    "CashIn": "资金差额",
    "CloseProfit": "平仓盈亏",
    "Commission": "手续费",
    "Credit": "信用额度",
    "CurrMargin": "当前保证金总额",
    "CurrencyID": "币种代码",
    "DeliveryMargin": "投资者交割保证金",
    "Deposit": "入金金额",
    "ExchangeDeliveryMargin": "交易所交割保证金",
    "ExchangeMargin": "交易所保证金",
    "FrozenCash": "冻结的资金",
    "FrozenCommission": "冻结的手续费",
    "FrozenMargin": "冻结的保证金",
    "FundMortgageAvailable": "货币质押余额",
    "FundMortgageIn": "货币质入金额",
    "FundMortgageOut": "货币质出金额",
    "Interest": "利息收入",
    "InterestBase": "利息基数",
    "Mortgage": "质押金额",
    "MortgageableFund": "可质押货币金额",
    "PositionProfit": "持仓盈亏",
    "PreBalance": "上次结算准备金",
    "PreCredit": "上次信用额度",
    "PreDeposit": "上次存款额",
    "PreFundMortgageIn": "上次货币质入金额",
    "PreFundMortgageOut": "上次货币质出金额",
    "PreMargin": "上次占用的保证金",
    "PreMortgage": "上次质押金额",
    "Reserve": "基本准备金",
    "ReserveBalance": "保底期货结算准备金",
    "SettlementID": "结算编号",
    "SpecProductCloseProfit": "特殊产品持仓盈亏",
    "SpecProductCommission": "特殊产品手续费",
    "SpecProductExchangeMargin": "特殊产品交易所保证金",
    "SpecProductFrozenCommission": "特殊产品冻结手续费",
    "SpecProductFrozenMargin": "特殊产品冻结保证金",
    "SpecProductMargin": "特殊产品占用保证金",
    "SpecProductPositionProfit": "特殊产品持仓盈亏",
    "SpecProductPositionProfitByAlg": "根据持仓盈亏算法计算的特殊产品持仓盈亏",
    "TradingDay": "交易日",
    "Withdraw": "出金金额",
    "WithdrawQuota": "可取资金",
}

def AccountToTable(Str, title = '账户的信息'):
    global trans
    if (not title in dir()) or (not title):
        title = '账户信息'
    tbl = {'type': "table", 'title': title, 'cols': ["字段", "描述", "值"], 'rows': []}
    try:
        fields = json.loads(Str)
        for k in fields:
            if k == 'AccountID' or k == 'BrokerID':
                continue
            if k not in trans: 
                desc = '--'
            else:   
                desc = trans[k]
            v = fields[k]
            if type(v) == typeOfstr('int') or type(v) == typeOfstr('float'):
                v = _N(v, 5)
            tbl['rows'].append([k, desc, v])
    except:
        Log(traceback.format_exc())
    return tbl

# NewTaskQueue 类
class NewTaskQueue:
    '模拟并发任务队列类'
    NewTaskQueueCount = 0
    def __init__(self, onTaskFinish = None):
        self.ERR_SUCCESS = 0
        self.ERR_SET_SYMBOL = 1
        self.ERR_GET_RECORDS = 2
        self.ERR_GET_ORDERS = 3
        self.ERR_GET_POS = 4
        self.ERR_TRADE = 5
        self.ERR_GET_DEPTH = 6
        self.ERR_NOT_TRADEING = 7
        self.ERR_BUSY = 8

        self.onTaskFinish = None if onTaskFinish is None else onTaskFinish
        self.retryInterval = 300
        self.tasks = []

    def pushTask(self, e, symbol, action, amount, onFinish):
        task = {
            "e" : e,
            "action" : action,
            "symbol" : symbol,
            "amount" : amount,
            "init" : False,
            "finished" : False,
            "dealAmount" : 0,
            "preAmount" : 0,
            "preCost" : 0,
            "retry" : 0,
            "maxRetry" : 10,
            "onFinish" : onFinish,
            "desc" : ""
        }
        
        # 暂时不用字典映射
        if task["action"] == "buy":
            task["desc"] = task["symbol"] + " 开多仓,数量" + str(task["amount"])
        elif task["action"] == "sell":
            task["desc"] = task["symbol"] + " 开空仓,数量" + str(task["amount"])
        elif task["action"] == "closebuy":
            task["desc"] = task["symbol"] + " 平多仓,数量" + str(task["amount"])
        elif task["action"] == "closesell":
            task["desc"] = task["symbol"] + " 平空仓,数量" + str(task["amount"])
        else:
            task["desc"] = task["symbol"] + " " + task["action"] + ", 数量 " + str(task["amount"])

        self.tasks.append(task)
        Log("接收到任务", task["desc"])
    
    def cancelAll(self, e):
        while True:
            orders = e.GetOrders()
            if orders is None:
                return self.ERR_GET_ORDERS
            if len(orders) == 0:
                break
            for i in range(len(orders)):
                e.CancelOrder(orders[i]["Id"])
                Sleep(self.retryInterval)
        return self.ERR_SUCCESS

    def pollTask(self, task):
        insDetail = task["e"].SetContractType(task["symbol"])
        if insDetail is None:
            return self.ERR_SET_SYMBOL
        # if (insDetail.MaxLimitOrderVolume == 0)
        if insDetail["MaxLimitOrderVolume"] == 0:
            # insDetail.MaxLimitOrderVolume = 50
            insDetail["MaxLimitOrderVolume"] = 50
        ret = False
        isCover = (task["action"] != "buy") and (task["action"] != "sell")
        while True:
            if not ext.IsTrading(task["symbol"]):
                return self.ERR_NOT_TRADEING
            Sleep(500)
            ret = self.cancelAll(task["e"])
            if ret != self.ERR_SUCCESS:
                return ret
            positions = task["e"].GetPosition()
            if positions is None:
                return self.ERR_GET_POS
            pos = None
            for i in range(len(positions)):
                if (positions[i]["ContractType"] == task["symbol"] and (((positions[i]["Type"] == PD_LONG or positions[i]["Type"] == PD_LONG_YD) and (task["action"] == "buy" or task["action"] == "closebuy")) or ((positions[i]["Type"] == PD_SHORT or positions[i]["Type"] == PD_SHORT_YD) and (task["action"] == "sell" or task["action"] == "closesell")))):
                    if pos is None:
                        pos = positions[i]
                        pos["Cost"] = positions[i]["Price"] * positions[i]["Amount"]
                    else:
                        pos["Amount"] += positions[i]["Amount"]
                        pos["Profit"] += positions[i]["Profit"]
                        pos["Cost"] += positions[i]["Price"] * positions[i]["Amount"]
            if not task["init"]:
                task["init"] = True
                if pos:
                    task["preAmount"] = pos["Amount"]
                    task["preCost"] = pos["Cost"]
                else:
                    task["preAmount"] = 0
                    task["preCost"] = 0
                    if isCover:
                        Log("找不到仓位", task["symbol"], task["action"])
                        ret = None
                        break
            remain = task["amount"]
            if isCover and (pos is None):
                pos = {"Amount": 0, "Cost": 0, "Price": 0}
            if pos:
                task["dealAmount"] = pos["Amount"] - task["preAmount"]
                if isCover:
                    task["dealAmount"] = -task["dealAmount"]
                remain = task["amount"] - task["dealAmount"]
                if (remain <= 0 or task["retry"] >= task["maxRetry"]):
                    ret = {
                        "price" : (0 if task["dealAmount"] == 0 else (pos["Cost"] - task["preCost"]) / (pos["Amount"] - task["preAmount"])),
                        "amount" : (pos["Amount"] - task["preAmount"]),
                        "position" : pos
                    }
                    if isCover:
                        ret["amount"] = -ret["amount"]
                        if pos["Amount"] == 0:
                            ret["position"] = None
                    break
            elif task["retry"] >= task["maxRetry"]:
                ret = None
                break

            depth = task["e"].GetDepth()
            if depth is None:
                return self.ERR_GET_DEPTH
            orderId = None
            slidePrice = insDetail["PriceTick"] * SlideTick
            if isCover:
                for i in range(len(positions)):
                    if positions[i]["ContractType"] != task["symbol"]:
                        continue
                    if (int(remain) < 1):
                        break
                    amount = min(insDetail["MaxLimitOrderVolume"], positions[i]["Amount"], remain)
                    if (task["action"] == "closebuy" and (positions[i]["Type"] == PD_LONG or positions[i]["Type"] == PD_LONG_YD)):
                        task["e"].SetDirection("closebuy_today" if positions[i]["Type"] ==  PD_LONG else "closebuy")
                        amount = min(amount, depth["Bids"][0]["Amount"])
                        orderId = task["e"].Sell(_N(depth["Bids"][0]["Price"] - slidePrice, 2), amount, task["symbol"], "平今" if positions[i]["Type"] == PD_LONG else "平昨", "Bid", depth["Bids"][0])
                        remain -= amount
                    elif (task["action"] == "closesell" and (positions[i]["Type"] == PD_SHORT or positions[i]["Type"] == PD_SHORT_YD)):
                        task["e"].SetDirection("closesell_today" if positions[i]["Type"] == PD_SHORT else "closesell")
                        amount = min(amount, depth["Asks"][0]["Amount"])
                        orderId = task["e"].Buy(_N(depth["Asks"][0]["Price"] + slidePrice, 2), amount, task["symbol"], "平今" if positions[i]["Type"] == PD_SHORT else "平昨", "Ask", depth["Asks"][0])
                        remain -= amount
            else:
                if task["action"] == "buy":
                    task["e"].SetDirection("buy")
                    orderId = task["e"].Buy(_N(depth["Asks"][0]["Price"] + slidePrice, 2), min(remain, depth["Asks"][0]["Amount"]), task["symbol"], "Ask", depth["Asks"][0])
                else :
                    task["e"].SetDirection("sell")
                    orderId = task["e"].Sell(_N(depth["Bids"][0]["Price"] - slidePrice, 2), min(remain, depth["Bids"][0]["Amount"]), task["symbol"], "Bid", depth["Bids"][0])

            if orderId is None:
                task["retry"] += 1
                return self.ERR_TRADE

        task["finished"] = True

        if self.onTaskFinish:
            self.onTaskFinish(task, ret)

        if task["onFinish"]:
            task["onFinish"](task, ret)

        return self.ERR_SUCCESS

    def poll(self):
        processed = 0

        # 迭代
        for task in self.tasks :
            if not task["finished"] :
                processed += 1
                self.pollTask(task)

        if processed == 0:
            self.tasks = []

    def size(self):
        return len(self.tasks)

    '''
    self.GetPosition = function(e, contractType, direction, positions) {
        return GetPosition(e, contractType, direction, positions);
    };
    '''
    def GetPosition(self, e, contractType, direction=None, positions=None):
        return GetPosition(e, contractType, direction, positions)

    '''
    self.hasTask = function(symbol) {
        if (typeof(symbol) !== 'string') {
            return self.tasks.length > 0
        }

        for (var i = 0; i < self.tasks.length; i++) {
            if (self.tasks[i].symbol == symbol && !self.tasks[i].finished) {
                return true
            }
        }
        return false
    }
    '''
    def hasTask(self, symbol):
        if type(symbol) != str:
            return len(self.tasks) > 0
        for task in self.tasks:
            if task["symbol"] == symbol and not task["finished"]:
                return True
        return False 




# NewPositionManager 类
class NewPositionManager:
    '非并发交易控制类'
    NewPositionManagerCount = 0

    # 构造函数
    def __init__(self, e):
        self.e = e
        self.account = None 
    
    # Account
    def Account(self):
        if self.account is None:
            self.account = _C(self.e.GetAccount)
        return self.account
    
    # GetAccount
    def GetAccount(self, getTable = False):
        self.account = _C(self.e.GetAccount)
        if (not getTable in dir() and getTable):
            return AccountToTable(self.e.GetRawJSON())
        return self.account

    # GetPosition
    def GetPosition(self, contractType, direction, positions = None):
        return GetPosition(self.e, contractType, direction, positions)

    # OpenLong
    def OpenLong(self, contractType, shares):
        if self.account is None:
            self.account = _C(self.e.GetAccount)
        return Open(self.e, contractType, PD_LONG, shares)
    
    # OpenShort
    def OpenShort(self, contractType, shares):
        if self.account is None:
            self.account = _C(self.e.GetAccount)
        return Open(self.e, contractType, PD_SHORT, shares)
    
    # Cover
    def Cover(self, contractType):
        if self.account is None:
            self.account = _C(self.e.GetAccount)
        return Cover(self.e, contractType)

    # CoverAll
    def CoverAll(self):
        if self.account is None:
            self.account = _C(self.e.GetAccount)
        while True:
            positions = _C(self.e.GetPosition)
            if len(positions) == 0:
                break
            for i in range(len(positions)):
                if '&' not in positions[i]['ContractType']:
                    Cover(self.e, positions[i]['ContractType'])
                    Sleep(1000)

    # Profit
    def Profit(self): # contractType JS 版本有该参数
        accountNow = _C(self.e.GetAccount)
        return _N(accountNow.Balance - self.account.Balance)

    # NewPositionManager END




# 导出函数实现
def CreateNewPositionManager(e = exchange): # 导出函数实现
    if e not in exchanges:
        raise Exception("error exchange", e)
    if (versionMainValue != 3 and e.GetName() != 'Futures_CTP') or (versionMainValue == 3 and e.GetName() != 'Futures_CTP'):
        raise Exception("error exchange, 本模板适用于CTP商品期货,当前添加的交易所为:", e.GetName());
    obj_NewPositionManager = NewPositionManager(e)
    return obj_NewPositionManager

def IsTrading(symbol):
    now = time.time()
    tup_localtime = time.localtime(now)
    day = tup_localtime.tm_wday    # tm_wday  : week 0~6 , 0 is monday
    hour = tup_localtime.tm_hour   # tm_hour : 0~23
    minute = tup_localtime.tm_min  # tm_min : 0~59
    
    if (day == 6 or (day == 5 and (hour > 2 or hour == 2 and minute > 30))):
        return False

    shortName = ""                      # i , p
    for i in range(len(symbol)):
        ch = symbol[i]
        if ch.isdigit():                # ch >= 48 and ch <= 57:
            break
        shortName += symbol[i].upper()
    
    period = [
        [9, 0, 10, 15],
        [10, 30, 11, 30],
        [13, 30, 15, 0]
    ]

    if (shortName == "IH" or shortName == "IF" or shortName == "IC"):
        period = [
            [9, 30, 11, 30],
            [13, 0, 15, 0]
        ]
    elif (shortName == "TF" or shortName == "T" or shortName == "TS"):
        period = [
            [9, 30, 11, 30],
            [13, 0, 15, 15]
        ]
    
    if day >= 0 and day <= 4:
        for i in range(len(period)):
            p = period[i]
            if ((hour > p[0] or (hour == p[0] and minute >= p[1])) and (hour < p[2] or (hour == p[2] and minute < p[3]))):
                return True

    ''' 参考JavaScript版本合约品种交易时间
    var nperiod = [
        [
            ['sc', 'ag', 'au'],
            [21, 0, 2, 30]
        ],
        [
            ['nr', 'bu', 'hc', 'rb', 'ru', 'sp', 'fu', 'ZC', 'CF', 'CY', 'FG', 'MA', 'PF', 'OI', 'RM', 'SR',
                'TA', 'SA', 'a', 'b', 'c', 'cs', 'i', 'j', 'jm', 'l', 'm', 'p', 'pp', 'v', 'y', 'eg', 'rr', 'eb', 'pg', 'lu',
            'PX', 'br', 'PR', 'SH'
            ],
            [21, 0, 23, 0]
        ],
        [
            ['al', 'cu', 'ni', 'pb', 'sn', 'zn', 'ss', 'bc', 'ao'],
            [21, 0, 1, 0]
        ]
    ];
    '''

    nperiod = [
        [
            ['SC', 'AU', 'AG'],
            [21, 0, 2, 30]  # 此处修改为 2
        ],
        [
            ['CU', 'AL', 'ZN', 'PB', 'SN', 'NI', 'SS', 'BC', 'AO'],
            [21, 0, 1, 0]   # 此处修改为 1
        ],
        [
            ['NR', 'BU', 'HC', 'RB', 'RU', 'SP', 'FU', 'ZC', 'CF', 'CY', 'FG', 'MA', 'PF', 'OI', 'RM', 'SR', 'TA', 'SA', 'A', 'B', 'C', 'CS', 'I', 'J', 'JM', 'L', 'M', 'P', 'PP', 'V', 'Y', 'EG', 'RR', 'EB', 'PG', 'LU', 'PX', 'BR', 'PR', 'SH'],
            [21, 0, 23, 0]
        ]
    ]

    for i in range(len(nperiod)):
        for j in range(len(nperiod[i][0])):
            if nperiod[i][0][j] == shortName:
                p = nperiod[i][1]
                condA = hour > p[0] or (hour == p[0] and minute >= p[1])
                condB = hour < p[2] or (hour == p[2] and minute < p[3])
                # in one day
                if p[2] >= p[0]:
                    if ((day >= 0 and day <= 4) and condA and condB):
                        return True
                else:
                    if (((day >= 0 and day <= 4) and condA) or ((day >= 1 and day <= 5) and condB)):
                        return True
                return False
    return False

def CreateNewTaskQueue(onTaskFinish = None):
    obj_NewTaskQueue = NewTaskQueue(onTaskFinish)
    return obj_NewTaskQueue

# 合约名称转换产品名称
def ins2product(symbol):
    if len(symbol) > 6 and symbol.find(' ') == -1:
        return symbol
    
    symbol = symbol.replace("SPD ", "").replace("SP ", "")
    shortName = ""
    for ch in symbol:
        if ch >= chr(48) and ch <= chr(57):
            break
        shortName += ch
    return shortName 

# CTA函数
def CTA(contractType, onTick, interval = 500):
    SetErrorFilter("login|ready|初始化")   # login|ready|初始化 过滤三种错误
    exchange.IO("mode", 0)
    lastUpdate = 0
    e = exchange
    symbols = contractType.split(",")
    holds = {}
    tblAccount = {}

    def findChartSymbol(ct):
        product = ins2product(ct)
        for i in symbols:
            tmp = i.split("/")
            if ins2product(tmp[-1]) == product:
                return tmp[0]
        return None 
    
    def refreshHold(qSize=0):
        while not e.IO("status"):
            LogStatus(_D(), "Not connect")
            Sleep(1000)

        for ins in symbols:
            tmp = ins.split("/")
            if len(tmp) == 2:
                holds[tmp[0]] = {
                    "price" : 0, 
                    "value" : 0,
                    "amount" : 0, 
                    "profit" : 0,
                    "symbol" : tmp[1]
                }
            else :
                holds[ins] = {
                    "price" : 0, 
                    "value" : 0, 
                    "amount" : 0, 
                    "profit" : 0, 
                    "symbol" : ins, 
                }
        positions = _C(e.GetPosition)
        for pos in positions:
            mapCT = findChartSymbol(pos["ContractType"])
            if not mapCT:
                continue 
            if mapCT not in holds:
                continue 
            hold = holds[mapCT]
            if pos["Type"] == PD_LONG or pos["Type"] == PD_LONG_YD:
                if hold["amount"] < 0 and qSize == 0:
                    raise "不能同时持有多仓空仓"
                hold["amount"] += pos["Amount"]
            else :
                if hold["amount"] > 0 and qSize == 0:
                    raise "不能同时持有多仓空仓"
                hold["amount"] -= pos["Amount"]
            hold["value"] += pos["Price"] * pos["Amount"]
            hold["profit"] += pos["Profit"]
            if hold["amount"] != 0:
                hold["price"] = _N(hold["value"] / abs(hold["amount"]))
        account = _C(e.GetAccount)
        if CTAShowPosition:
            tblPosition = {
                "type" : "table", 
                "title" : "持仓状态", 
                "cols" : ["品种", "方向", "均价", "数量", "浮动盈亏"], 
                "rows" : [], 
            }
            for pos in positions:
                # tblPosition.rows.push([pos.ContractType, ((pos.Type == PD_LONG || pos.Type == PD_LONG_YD) ? '多#0000ff' : '空#ff0000'), pos.Price, pos.Amount, pos.Profit])
                tblPosition["rows"].append([pos["ContractType"], "多#0000ff" if (pos["Type"] == PD_LONG or pos["Type"] == PD_LONG_YD) else "空#ff0000", pos["Price"], pos["Amount"], pos["Profit"]])
            tblAccount = ext.AccountToTable(e.GetRawJSON(), "资金信息")
            LogStatus("`" + json.dumps([tblPosition, tblAccount]) + "`\n", "更新于:" + _D())
        lastUpdate = time.time() * 1000
        return account

    account = refreshHold(0)
    def callBack_NewTaskQueue(task, ret):
        Log("任务结束", task["desc"])
        account = refreshHold(q.size())
    q = ext.NewTaskQueue(callBack_NewTaskQueue)
    mainCache = {}
    while True:
        ts = time.time() * 1000
        for ins in symbols:
            ctChart = ins 
            ctTrade = ins
            tmp = ins.split("/")
            if len(tmp) == 2:
                ctChart = tmp[0]
                ctTrade = tmp[1]
            if not e.IO("status") or not ext.IsTrading(ctChart) or not ext.IsTrading(ctTrade) or q.hasTask(ctTrade):
                continue 
            # 正在移仓
            if ctTrade in mainCache and (q.hasTask(mainCache[ctTrade][0]) or q.hasTask(mainCache[ctTrade][1])):
                continue

            c = e.SetContractType(ctChart)
            if not c:
                continue
            r = e.GetRecords()
            if r is None or len(r) == 0:
                continue
            insDetail = e.SetContractType(ctTrade)
            if not insDetail:
                continue
            tradeSymbol = insDetail["InstrumentID"]
            if ctTrade.find("888") != -1 or ctTrade.find("000") != -1:
                preMain = ""
                isSwitch = False 
                positions = None
                if ctTrade not in mainCache:
                    if not IsVirtual():
                        Log(ctTrade, "当前主力合约为:", tradeSymbol)
                    positions = e.GetPosition()
                    if positions is None:
                        continue
                    product = ins2product(ctTrade)
                    for p in positions:
                        if ins2product(p["ContractType"]) == product:
                            mainCache[ctTrade] = [p["ContractType"], p["ContractType"]]
                if ctTrade in mainCache and mainCache[ctTrade][0] != tradeSymbol:
                    preMain = mainCache[ctTrade][0]
                    Log(ctTrade, "主力合约切换为:", tradeSymbol, "之前为:", preMain, "#ff0000")
                    if positions is None:
                        positions = e.GetPosition()
                        if positions is None:
                            continue
                    for p in positions:
                        if p["ContractType"] == preMain:
                            isLong = p["Type"] == PD_LONG or p["Type"] == PD_LONG_YD
                            def callBack_pushTaskCover(task, ret):
                                Log("切换合约平仓成功", task["desc"], ret)
                            def callBack_pushTaskOpen(task, ret):
                                Log("切换合约开仓成功", task["desc"], ret)
                            q.pushTask(e, p["ContractType"], "closebuy" if isLong else "closesell", p["Amount"], callBack_pushTaskCover)
                            q.pushTask(e, tradeSymbol, "buy" if isLong else "sell", p["Amount"], callBack_pushTaskOpen)
                            isSwitch = True 
                mainCache[ctTrade] = [tradeSymbol, preMain]
                if isSwitch:
                    Log("开始移仓", preMain, "移到", tradeSymbol)
                    continue
            hold = holds[ctChart]
            n = onTick({
                "records" : r,
                "symbol" : tradeSymbol,  
                "detail" : insDetail,  
                "account" : account,  
                "position" : hold,  
                "positions" : holds,  
            })
            callBack = None 
            if type(n) == typeOfstr("list") and type(len(n)) == typeOfstr("int") and len(n) > 1:
                if type(n[1]) == types.FunctionType:
                    callBack = n[1]
                n = n[0]
            if type(n) != typeOfstr("int") and type(n) != typeOfstr("float"):
                continue
            ret = None 
            if n > 0:
                if hold["amount"] < 0:
                	q.pushTask(e, tradeSymbol, "closesell", min(-hold["amount"], n), callBack)
                	n += hold["amount"]
                if n > 0:
                	q.pushTask(e, tradeSymbol, "buy", n, callBack)
            elif n < 0:
                if hold["amount"] > 0:
                	q.pushTask(e, tradeSymbol, "closebuy", min(hold["amount"], -n), callBack)
                	n += hold["amount"]
                if n < 0:
                	q.pushTask(e, tradeSymbol, "sell", -n, callBack)
            elif n == 0 and hold["amount"] != 0:
                q.pushTask(e, tradeSymbol, "closebuy" if hold["amount"] > 0 else "closesell", abs(hold["amount"]), callBack)
        q.poll()

        now = time.time() * 1000
        if now - lastUpdate > SyncInterval * 1000:
            account = refreshHold(q.size())
        delay = interval - (now - ts)
        if delay > 0:
            Sleep(delay)

def Cross(array1, array2):
    if len(array1) != len(array2):
        raise "array length not equal"
    arr1 = array1.copy()
    arr2 = array2.copy()
    n = 0
    arr1.reverse()
    arr2.reverse()
    for i in range(len(arr1)):
        if arr1[i] is None or arr2[i] is None:
            break
        if arr1[i] < arr2[i]:
            if n > 0:
                break
            n -= 1
        elif arr1[i] > arr2[i]:
            if n < 0:
                break 
            n += 1
        else :
            break
    return n


# 导出函数
ext.NewPositionManager = CreateNewPositionManager
ext.IsTrading = IsTrading
ext.AccountToTable = AccountToTable
ext.NewTaskQueue = CreateNewTaskQueue
ext.CTA = CTA 
ext.Cross = Cross

# 测试  IsVirtual() 判断是否是回测。
def VGetRawJSON():   # 模拟  GetRawJSON 函数 ,仅测试使用。 
    nowTime = time.time()
    DnowTime = _D(nowTime)
    dict1 = {"AccountID": "073997", "Available": 1331.445464656656, "Balance": 1331.3344567, "BrokerID": "9999", "time": nowTime, "_D": DnowTime, "CurrMargin": 0}
    dict1Str = json.dumps(dict1)
    return dict1Str

def main():
    '''
    # 测试 AccountToTable
    Log("测试 AccountToTable 函数 ")
    Str = '{"AccountID": "073997", "Available": 1331.445464656656, "Balance": 1331.3344567, "BrokerID": "9999", "CashIn": 0, "XXX": "dd"}'
    table = AccountToTable(Str)
    Log(json.dumps(table))
    LogStatus('`' + json.dumps(table) + '`')

    # 测试 IsTrading
    Log("now time", _D(), "isTrading('MA701'): ", ext.IsTrading("MA701"))   
    Log("now time", _D(), "isTrading('SR701'): ", ext.IsTrading("SR701")) 
    Log("now time", _D(), "isTrading('jd1701'): ", ext.IsTrading("jd1701")) 

    # 测试 NewPositionManager 导出函数 生成对象
    obj = ext.NewPositionManager()
    Log(obj.Account(), obj.account)
    
    # 测试 NewPositionManager 类 实例的成员函数 GetAccount
    retGetAccount = obj.GetAccount(True)
    Log(retGetAccount)
    LogStatus('`' + json.dumps(retGetAccount) + '`')    
    
    # 测试 OpenLong 、 OpenShort 、GetPosition
    open_long = obj.OpenLong("MA701", 2)
    open_short1 = obj.OpenShort("SR701", 3)
    open_short2 = obj.OpenShort("jd1701", 4)
    Log("open_long:", open_long)
    Log("open_short1:", open_short1)    
    Log("open_short2:", open_short2)
    positions = obj.GetPosition("MA701", PD_SHORT)
    Log("get MA701 PD_SHORT:", positions)
    positions = obj.GetPosition("MA701", PD_LONG)
    Log("get MA701 PD_LONG:", positions)

    # 测试 CoverAll
    obj.Cover("MA701")

    # 测试 CoverAll 
    obj.CoverAll()
    
    # 读取 Cover  、CoverAll 后的 持仓信息
    positions = exchange.GetPosition()
    Log("now Positions:", positions)

    # 测试 GetAccount 、 Profit
    # Log("Account:", obj.GetAccount())
    Log("Profit:", obj.Profit())
    Log("account", obj.Account())
    Log("Account:", obj.GetAccount())

    # 测试 IsTrading
    Log(ext.IsTrading("MA701"))    

    # 测试 ext.AccountToTable
    Str = '{"AccountID": "000", "Available": 55, "Balance": 55, "BrokerID": "9999", "CashIn": 0, "ZZZ": "YHU"}'
    table = ext.AccountToTable(Str)
    Log(json.dumps(table))
    LogStatus('`' + json.dumps(table) + '`')

    Log("-------------------------------------------------------------")
    '''
    
    # 测试 NewTaskQueue
    '''
    q = ext.NewTaskQueue()
    
    def NoName2(task, ret):
        Log(task["desc"], ret, "#FF0000")

    def NoName1(task, ret):
        Log(task["desc"], ret, "#FF0000")
        if ret:
            q.pushTask(exchange, "MA701", "closebuy", 1, NoName2)

    q.pushTask(exchange, "MA701", "buy", 3, lambda task, ret: Log(task["desc"], ret, q.pushTask(exchange, "MA701", "closebuy", 1, lambda task, ret: Log(task["desc"], ret, "#FF0000")) if ret else "", "#FF0000"))

    Log("q.tasks`s length :", q.size())
    
    while True:
        q.poll()
        Sleep(1000)
    '''
    
    # 测试CTA函数
    def callBack_CTA(st):
        if len(st["records"]) < 20:
            return 
        emaSlow = TA.EMA(st["records"], 20)
        emaFast = TA.EMA(st["records"], 5)
        cross = ext.Cross(emaFast, emaSlow)
        
        if st["position"]["amount"] <= 0 and cross > 2:
            Log("金叉周期", cross, "当前持仓:", st["position"])
            return 2 if st["position"]["amount"] < 0 else 1
        elif st["position"]["amount"] >= 0 and cross < -2:
            Log("死叉周期", cross, "当前持仓:", st["position"])
            return -2 if st["position"]["amount"] > 0 else -1
    
    ret = ext.CTA("MA000/MA888,rb000/rb888,i000/i888", callBack_CTA)
    

更多内容

lwc87_qh 发现个问题,cross函数里用到了reverse函数倒置序列,此时如果在使用了cross函数判定后还需要使用序列,就有可能因为序列已经倒置而报错,cross函数里执行完运算后没有将序列重置回去

雨幕(youquant) 好的,感谢建议。