threading
对象作为全局多线程管理工具,提供了创建并发线程、线程锁、条件对象等功能;本章节介绍threading
对象的成员函数;仅JavaScript
语言策略支持该对象。
Thread()
函数用于创建并发线程。
Thread()
函数返回一个Thread
对象,用于管理创建出的并发线程、线程通信等。
Thread
对象
Thread(func, …args) Thread(…items)
参数func
是用于并发执行的函数(通过引用传递),支持传入匿名函数。func
可接受多个参数,这些参数将在并发执行时通过...args
传入。因此,func
的参数列表需要与...args
一致。
func
true
function
参数arg
是在回调执行时传递给func
(即并发线程执行函数)的实际参数;参数arg
可能有多个,func
的参数列表需要与...args
一致。
arg
false
string、number、bool、object、array、function、空值等系统支持的所有类型
参数item
是一个数组,包含待并发执行的函数引用及其参数,调用Thread
函数时的参数item
可以传入多组。
item true array
function test1(a, b, c) {
Log("test1:", a, b, c)
}
function main() {
var t1 = threading.Thread(test1, 1, 2, 3)
var t2 = threading.Thread(function (msg) {
Log("msg:", msg)
}, "Hello thread2")
t1.join()
t2.join()
}
同时创建一个自定义函数和一个匿名函数的并发线程。
function test1(msg) {
Log("msg:", msg)
test2("Hello test2")
}
function main() {
var t1 = threading.Thread(
[function(a, b, c) {Log(a, b, c)}, 1, 2, 3],
[test1, "Hello test1"],
[`function test2(msg) {Log("msg:", msg)}`])
t1.join()
}
使用Thread(...items)
形式来创建并发线程,顺序执行多个函数。
function testFunc1(p) {
Log("testFunc1 p:", p)
}
function main() {
threading.Thread(function(pfn) {
var threadName = threading.currentThread().name()
var threadId = threading.currentThread().id()
pfn(`in thread threadName: ${threadName}, threadId: ${threadId}`)
}, testFunc1).join()
}
支持向并发执行函数的参数传入函数。
function ml(input) {
const net = new brain.NeuralNetwork()
net.train([
{ input: [0, 0], output: [0] },
{ input: [0, 1], output: [1] },
{ input: [1, 0], output: [1] },
{ input: [1, 1], output: [0] },
])
return net.run(input)
}
function main() {
var ret = threading.Thread([ml, [1, 0]], [HttpQuery("https://unpkg.com/brain.js")]).join()
// ret: {"id":1,"terminated":false,"elapsed":337636000,"ret":{"0":0.9339330196380615}}
Log(ret)
}
支持传入函数字符串,可以动态导入外部库进行并发计算。
传入Thread()
函数用于并发执行的线程函数func
运行在隔离环境中,因此无法直接引用线程外部的变量,引用时会编译失败。同时,线程内不支持引用其它闭包函数。线程内部可调用平台提供的所有API,但不能调用用户自定义的其他函数。
支持回测系统、实盘环境;所有并发线程相关的函数,在回测系统中仅作为代码兼容支持,实际不会真正并发线程执行,本章不再赘述。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
getThread()
函数用于根据指定的线程Id获取线程对象。
getThread()
函数返回通过参数指定threadId的Thread
对象
Thread
对象
getThread(threadId)
参数threadId
为线程对象Id,通过指定参数获取对应的线程对象。
threadId true number
function main() {
var t1 = threading.Thread(function () {
Log("Hello thread1")
})
// Thread 对象有方法:id(),用于获取线程的Id,可以查看文档对应Thread对象的章节
var threadId = t1.id()
var threadName = t1.name()
Log("threadId:", threadId, ", threadName:", threadName)
var t2 = threading.getThread(threadId)
Log(`threadId == t2.id():`, threadId == t2.id(), `, threadName == t2.name():`, threadName == t2.name())
}
通过threadId
获取指定的线程对象。
支持回测系统、实盘环境。
如果期望获取的线程已经执行完毕、释放,此时无法通过threading.getThread(threadId)
获取该线程的线程对象。
{@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
mainThread()
函数用于获取主线程的线程对象,即策略中main()
函数所在的线程。
mainThread()
函数返回主线程的线程对象。
Thread
对象
mainThread()
function main() {
Log("主线程的threadId:", threading.mainThread().id())
}
获取主线程的Thread
对象,输出主线程的threadId
。
function test() {
Log("test函数中输出主线程Id:", threading.mainThread().id())
}
function main() {
var t1 = threading.Thread(test)
t1.join()
}
在并发的线程中也可以获取主线程的线程对象。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
currentThread()
函数用于获取当前线程的线程对象。
currentThread()
函数返回当前线程的线程对象。
Thread
对象
currentThread()
function test() {
Log("当前线程的Id:", threading.currentThread().id())
}
function main() {
var t1 = threading.Thread(test)
t1.join()
}
获取当前线程的Thread
对象,输出当前线程的threadId
。
支持回测系统、实盘环境。
{@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
Lock()
函数用于创建线程锁对象。
Lock()
函数返回一个线程锁对象。
ThreadLock
对象
Lock()
function consumer(productionQuantity, dict, lock) {
for (var i = 0; i < productionQuantity; i++) {
lock.acquire()
var count = dict.get("count")
Log("consumer:", count)
Sleep(1000)
lock.release()
}
}
function producer(productionQuantity, dict, lock) {
for (var i = 0; i < productionQuantity; i++) {
lock.acquire()
dict.set("count", i)
Log("producer:", i)
Sleep(1000)
lock.release()
}
}
function main() {
var dict = threading.Dict()
dict.set("count", -1)
var lock = threading.Lock()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, lock)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, lock)
consumerThread.join()
producerThread.join()
}
两个并发线程访问公共资源。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
Condition()
函数用于创建一个条件变量对象,该对象用于在多线程并发环境中实现线程间的同步与通信。通过Condition()
,一个线程可以在某些条件未满足时等待,直到另一个线程通知它条件已经满足。
Condition()
函数返回一个ThreadCondition
对象。
ThreadCondition
对象
Condition()
function consumer(productionQuantity, dict, condition) {
for (var i = 0; i < productionQuantity; i++) {
condition.acquire()
while (dict.get("array").length == 0) {
condition.wait()
}
var arr = dict.get("array")
var count = arr.shift()
dict.set("array", arr)
Log("consumer:", count, ", array:", arr)
condition.release()
Sleep(1000)
}
}
function producer(productionQuantity, dict, condition) {
for (var i = 0; i < productionQuantity; i++) {
condition.acquire()
var arr = dict.get("array")
arr.push(i)
dict.set("array", arr)
Log("producer:", i, ", array:", arr)
condition.notify()
condition.release()
Sleep(1000)
}
}
function main() {
var dict = threading.Dict()
dict.set("array", [])
var condition = threading.Condition()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, condition)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, condition)
consumerThread.join()
producerThread.join()
}
两个并发线程访问公共资源。
回测系统没有实现该功能,仅仅是定义。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
Event()
函数用于创建一个线程事件对象,该对象用于线程间的同步,允许一个线程等待另一个线程的通知或信号。
Event()
函数返回一个ThreadEvent
对象。
ThreadEvent
对象
Event()
function consumer(productionQuantity, dict, pEvent, cEvent) {
for (var i = 0; i < productionQuantity; i++) {
while (dict.get("array").length == 0) {
pEvent.wait()
}
if (pEvent.isSet()) {
pEvent.clear()
}
var arr = dict.get("array")
var count = arr.shift()
dict.set("array", arr)
Log("consumer:", count, ", array:", arr)
cEvent.set()
Sleep(1000)
}
}
function producer(productionQuantity, dict, pEvent, cEvent) {
for (var i = 0; i < productionQuantity; i++) {
while (dict.get("array").length != 0) {
cEvent.wait()
}
if (cEvent.isSet()) {
cEvent.clear()
}
var arr = dict.get("array")
arr.push(i)
dict.set("array", arr)
Log("producer:", i, ", array:", arr)
pEvent.set()
Sleep(1000)
}
}
function main() {
var dict = threading.Dict()
dict.set("array", [])
var pEvent = threading.Event()
var cEvent = threading.Event()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, pEvent, cEvent)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, pEvent, cEvent)
consumerThread.join()
producerThread.join()
}
两个并发线程访问公共资源。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
Dict()
函数用于创建一个字典对象,用于传递到并发的线程中。
Dict()
函数返回一个ThreadDict
对象。
ThreadDict
对象
Dict()
function threadFun1(obj) {
obj["age"] = 100
while (true) {
Log("threadFun1 obj:", obj)
Sleep(5000)
}
}
function threadFun2(obj) {
while (true) {
Log("threadFun2 obj:", obj)
Sleep(5000)
}
}
function main() {
var obj = {"age": 10}
var t1 = threading.Thread(threadFun1, obj)
var t2 = threading.Thread(threadFun2, obj)
t1.join()
t2.join()
}
给并发的线程执行函数传入普通的对象,测试修改对象的键值后是否引起其它线程中的对象键值变动。
function threadFun1(threadDict) {
threadDict.set("age", 100)
while (true) {
Log(`threadFun1 threadDict.get("age"):`, threadDict.get("age"))
Sleep(5000)
}
}
function threadFun2(threadDict) {
while (true) {
Log(`threadFun2 threadDict.get("age"):`, threadDict.get("age"))
Sleep(5000)
}
}
function main() {
var threadDict = threading.Dict()
threadDict.set("age", 10)
var t1 = threading.Thread(threadFun1, threadDict)
var t2 = threading.Thread(threadFun2, threadDict)
t1.join()
t2.join()
}
给并发的线程执行函数传入Dict()
函数创建的ThreadDict
对象,测试修改对象的键值后是否引起其它线程中的对象键值变动。
线程并发函数传入普通对象时为深拷贝传递,在并发线程中修改键值,并不会影响到其它线程中的字典。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
pending
函数用于获取当前策略程序正在运行的并发线程数。
pending()
函数返回当前策略程序正在运行的并发线程数。
number
pending()
function threadFun1() {
Log("threadFun1")
Sleep(3000)
}
function threadFun2() {
for (var i = 0; i < 3; i++) {
LogStatus(_D(), "print from threadFun2")
Sleep(3000)
}
}
function main() {
Log(`begin -- threading.pending():`, threading.pending())
var t1 = threading.Thread(threadFun1)
var t2 = threading.Thread(threadFun2)
Log(`after threading.Thread -- threading.pending():`, threading.pending())
t1.join()
t2.join()
Log(`after thread.join -- threading.pending():`, threading.pending())
}
创建两个并发运行的线程,在不同时间节点调用pending()
函数。
策略main()
函数开始运行时直接调用pending()
函数会返回1,因为策略main()
函数所在的主线程也是一个pending中的线程。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/eventLoop eventLoop}
Futures Thread