优宽量化交易平台从系统底层真正支持JavaScript
语言策略的多线程功能,实现了以下对象:
对象 | 说明 | 备注 |
---|---|---|
threading | 多线程全局对象 | 成员函数:Thread 、getThread 、mainThread 等。 |
Thread | 线程对象 | 成员函数:peekMessage 、postMessage 、join 等。 |
ThreadLock | 线程锁对象 | 成员函数:acquire 、release 。可作为线程执行函数的参数传入线程环境。 |
ThreadEvent | 事件对象 | 成员函数:set 、clear 、wait 、isSet 。可作为线程执行函数的参数传入线程环境。 |
ThreadCondition | 条件对象 | 成员函数:notify 、notifyAll 、wait 、acquire 、release 。可作为线程执行函数的参数传入线程环境。 |
ThreadDict | 字典对象 | 成员函数:get 、set 。可作为线程执行函数的参数传入线程环境。 |
threading
对象作为全局多线程管理工具,提供了创建并发线程、线程锁、条件对象等功能;本章节介绍threading
对象的成员函数;仅JavaScript
语言策略支持该对象。
Thread()
函数用于创建并发线程。
Thread()
函数返回一个Thread
对象,用于管理创建出的并发线程、线程通信等。
Thread
对象
Thread(func, …args) Thread(…items)
参数func
是用于并发执行的函数(通过引用传递),支持传入匿名函数。func
可接受多个参数,这些参数将在并发执行时通过...args
传入。因此,func
的参数列表需要与...args
一致。
func
true
function
参数arg
是在回调执行时传递给func
(即并发线程执行函数)的实际参数;参数arg
可能有多个,func
的参数列表需要与...args
一致。
arg
false
string、number、bool、object、array、function、空值等系统支持的所有类型
参数item
是一个数组,包含待并发执行的函数引用及其参数,调用Thread
函数时的参数item
可以传入多组。
item true array
function test1(a, b, c) {
Log("test1:", a, b, c)
}
function main() {
var t1 = threading.Thread(test1, 1, 2, 3)
var t2 = threading.Thread(function (msg) {
Log("msg:", msg)
}, "Hello thread2")
t1.join()
t2.join()
}
同时创建一个自定义函数和一个匿名函数的并发线程。
function test1(msg) {
Log("msg:", msg)
test2("Hello test2")
}
function main() {
var t1 = threading.Thread(
[function(a, b, c) {Log(a, b, c)}, 1, 2, 3],
[test1, "Hello test1"],
[`function test2(msg) {Log("msg:", msg)}`])
t1.join()
}
使用Thread(...items)
形式来创建并发线程,顺序执行多个函数。
function ml(input) {
const net = new brain.NeuralNetwork()
net.train([
{ input: [0, 0], output: [0] },
{ input: [0, 1], output: [1] },
{ input: [1, 0], output: [1] },
{ input: [1, 1], output: [0] },
])
return net.run(input)
}
function main() {
var ret = threading.Thread([ml, [1, 0]], [HttpQuery("https://unpkg.com/brain.js")]).join()
// ret: {"id":1,"terminated":false,"elapsed":337636000,"ret":{"0":0.9339330196380615}}
Log(ret)
}
支持传入函数字符串,可以动态导入外部库进行并发计算。
传入Thread()
函数用于并发执行的线程函数func
运行在隔离环境中,因此无法直接引用线程外部的变量,引用时会编译失败。同时,线程内不支持引用其它闭包函数。线程内部可调用平台提供的所有API,但不能调用用户自定义的其他函数。
支持回测系统、实盘环境;所有并发线程相关的函数,在回测系统中仅作为代码兼容支持,实际不会真正并发线程执行,本章不再赘述。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
getThread()
函数用于根据指定的线程Id获取线程对象。
getThread()
函数返回通过参数指定threadId的Thread
对象
Thread
对象
getThread(threadId)
参数threadId
为线程对象Id,通过指定参数获取对应的线程对象。
threadId true number
function main() {
var t1 = threading.Thread(function () {
Log("Hello thread1")
})
// Thread 对象有方法:id(),用于获取线程的Id,可以查看文档对应Thread对象的章节
var threadId = t1.id()
var threadName = t1.name()
Log("threadId:", threadId, ", threadName:", threadName)
var t2 = threading.getThread(threadId)
Log(`threadId == t2.id():`, threadId == t2.id(), `, threadName == t2.name():`, threadName == t2.name())
}
通过threadId
获取指定的线程对象。
支持回测系统、实盘环境。
如果期望获取的线程已经执行完毕、释放,此时无法通过threading.getThread(threadId)
获取该线程的线程对象。
{@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
mainThread()
函数用于获取主线程的线程对象,即策略中main()
函数所在的线程。
mainThread()
函数返回主线程的线程对象。
Thread
对象
mainThread()
function main() {
Log("主线程的threadId:", threading.mainThread().id())
}
获取主线程的Thread
对象,输出主线程的threadId
。
function test() {
Log("test函数中输出主线程Id:", threading.mainThread().id())
}
function main() {
var t1 = threading.Thread(test)
t1.join()
}
在并发的线程中也可以获取主线程的线程对象。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
currentThread()
函数用于获取当前线程的线程对象。
currentThread()
函数返回当前线程的线程对象。
Thread
对象
currentThread()
function test() {
Log("当前线程的Id:", threading.currentThread().id())
}
function main() {
var t1 = threading.Thread(test)
t1.join()
}
获取当前线程的Thread
对象,输出当前线程的threadId
。
支持回测系统、实盘环境。
{@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
Lock()
函数用于创建线程锁对象。
Lock()
函数返回一个线程锁对象。
ThreadLock
对象
Lock()
function consumer(productionQuantity, dict, lock) {
for (var i = 0; i < productionQuantity; i++) {
lock.acquire()
var count = dict.get("count")
Log("consumer:", count)
Sleep(1000)
lock.release()
}
}
function producer(productionQuantity, dict, lock) {
for (var i = 0; i < productionQuantity; i++) {
lock.acquire()
dict.set("count", i)
Log("producer:", i)
Sleep(1000)
lock.release()
}
}
function main() {
var dict = threading.Dict()
dict.set("count", -1)
var lock = threading.Lock()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, lock)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, lock)
consumerThread.join()
producerThread.join()
}
两个并发线程访问公共资源。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
Condition()
函数用于创建一个条件变量对象,该对象用于在多线程并发环境中实现线程间的同步与通信。通过Condition()
,一个线程可以在某些条件未满足时等待,直到另一个线程通知它条件已经满足。
Condition()
函数返回一个ThreadCondition
对象。
ThreadCondition
对象
Condition()
function consumer(productionQuantity, dict, condition) {
for (var i = 0; i < productionQuantity; i++) {
condition.acquire()
while (dict.get("array").length == 0) {
condition.wait()
}
var arr = dict.get("array")
var count = arr.shift()
dict.set("array", arr)
Log("consumer:", count, ", array:", arr)
condition.release()
Sleep(1000)
}
}
function producer(productionQuantity, dict, condition) {
for (var i = 0; i < productionQuantity; i++) {
condition.acquire()
var arr = dict.get("array")
arr.push(i)
dict.set("array", arr)
Log("producer:", i, ", array:", arr)
condition.notify()
condition.release()
Sleep(1000)
}
}
function main() {
var dict = threading.Dict()
dict.set("array", [])
var condition = threading.Condition()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, condition)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, condition)
consumerThread.join()
producerThread.join()
}
两个并发线程访问公共资源。
回测系统没有实现该功能,仅仅是定义。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
Event()
函数用于创建一个线程事件对象,该对象用于线程间的同步,允许一个线程等待另一个线程的通知或信号。
Event()
函数返回一个ThreadEvent
对象。
ThreadEvent
对象
Event()
function consumer(productionQuantity, dict, pEvent, cEvent) {
for (var i = 0; i < productionQuantity; i++) {
while (dict.get("array").length == 0) {
pEvent.wait()
}
if (pEvent.isSet()) {
pEvent.clear()
}
var arr = dict.get("array")
var count = arr.shift()
dict.set("array", arr)
Log("consumer:", count, ", array:", arr)
cEvent.set()
Sleep(1000)
}
}
function producer(productionQuantity, dict, pEvent, cEvent) {
for (var i = 0; i < productionQuantity; i++) {
while (dict.get("array").length != 0) {
cEvent.wait()
}
if (cEvent.isSet()) {
cEvent.clear()
}
var arr = dict.get("array")
arr.push(i)
dict.set("array", arr)
Log("producer:", i, ", array:", arr)
pEvent.set()
Sleep(1000)
}
}
function main() {
var dict = threading.Dict()
dict.set("array", [])
var pEvent = threading.Event()
var cEvent = threading.Event()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, pEvent, cEvent)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, pEvent, cEvent)
consumerThread.join()
producerThread.join()
}
两个并发线程访问公共资源。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
Dict()
函数用于创建一个字典对象,用于传递到并发的线程中。
Dict()
函数返回一个ThreadDict
对象。
ThreadDict
对象
Dict()
function threadFun1(obj) {
obj["age"] = 100
while (true) {
Log("threadFun1 obj:", obj)
Sleep(5000)
}
}
function threadFun2(obj) {
while (true) {
Log("threadFun2 obj:", obj)
Sleep(5000)
}
}
function main() {
var obj = {"age": 10}
var t1 = threading.Thread(threadFun1, obj)
var t2 = threading.Thread(threadFun2, obj)
t1.join()
t2.join()
}
给并发的线程执行函数传入普通的对象,测试修改对象的键值后是否引起其它线程中的对象键值变动。
function threadFun1(threadDict) {
threadDict.set("age", 100)
while (true) {
Log(`threadFun1 threadDict.get("age"):`, threadDict.get("age"))
Sleep(5000)
}
}
function threadFun2(threadDict) {
while (true) {
Log(`threadFun2 threadDict.get("age"):`, threadDict.get("age"))
Sleep(5000)
}
}
function main() {
var threadDict = threading.Dict()
threadDict.set("age", 10)
var t1 = threading.Thread(threadFun1, threadDict)
var t2 = threading.Thread(threadFun2, threadDict)
t1.join()
t2.join()
}
给并发的线程执行函数传入Dict()
函数创建的ThreadDict
对象,测试修改对象的键值后是否引起其它线程中的对象键值变动。
线程并发函数传入普通对象时为深拷贝传递,在并发线程中修改键值,并不会影响到其它线程中的字典。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
pending
函数用于获取当前策略程序正在运行的并发线程数。
pending()
函数返回当前策略程序正在运行的并发线程数。
number
pending()
function threadFun1() {
Log("threadFun1")
Sleep(3000)
}
function threadFun2() {
for (var i = 0; i < 3; i++) {
LogStatus(_D(), "print from threadFun2")
Sleep(3000)
}
}
function main() {
Log(`begin -- threading.pending():`, threading.pending())
var t1 = threading.Thread(threadFun1)
var t2 = threading.Thread(threadFun2)
Log(`after threading.Thread -- threading.pending():`, threading.pending())
t1.join()
t2.join()
Log(`after thread.join -- threading.pending():`, threading.pending())
}
创建两个并发运行的线程,在不同时间节点调用pending()
函数。
策略main()
函数开始运行时直接调用pending()
函数会返回1,因为策略main()
函数所在的主线程也是一个pending中的线程。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/eventLoop eventLoop}
Thread
对象可以由threading.Thread()
、threading.getThread()
、threading.mainThread()
、threading.currentThread()
创建或者返回。
peekMessage()
函数用于从线程获取消息。
peekMessage()
函数返回当前线程对象关联的线程收到的消息。
string、number、bool、object、array、空值等系统支持的所有类型
peekMessage() peekMessage(timeout)
参数timeout
为超时设置,会按照该参数设置的毫秒数阻塞等待,返回数据;没有数据并且超时则返回空值。如果timeout
设置为0或者不传timeout
参数,则表示一直阻塞等待,直到接收到通道中的数据。如果timeout
设置为-1,则表示不阻塞并且立即返回数据,没有数据时返回空值。
timeout false number
function main() {
var t1 = threading.Thread(function() {
for (var i = 0; i < 10; i++) {
Log("thread1 postMessage():", i)
threading.mainThread().postMessage(i)
Sleep(500)
}
})
while (true) {
var msg = threading.currentThread().peekMessage()
Log("main peekMessage():", msg)
if (msg == 9) {
break
}
Sleep(1000)
}
t1.join()
}
并发线程中向主线程发送消息。
在编写程序时需要注意线程死锁问题。
{@fun/Threads/Thread/postMessage postMessage}, {@fun/Threads/Thread/join join}, {@fun/Threads/Thread/terminate terminate}, {@fun/Threads/Thread/getData getData}, {@fun/Threads/Thread/setData setData}, {@fun/Threads/Thread/id id}, {@fun/Threads/Thread/name name}, {@fun/Threads/Thread/eventLoop eventLoop}
postMessage()
函数用于向线程发送消息。
postMessage(msg)
参数msg
为所要发送的消息。
msg true string、number、bool、object、array、空值等系统支持的任意类型
function main() {
var t1 = threading.Thread(function() {
for (var i = 0; i < 10; i++) {
Log("thread1 postMessage():", i)
threading.mainThread().postMessage(i)
Sleep(500)
}
})
for (var i = 0; i < 10; i++) {
var event = threading.mainThread().eventLoop()
Log("main event:", event)
Sleep(500)
}
t1.join()
}
在并发的线程中发送消息,使用eventLoop()
接收消息通知。
当一个线程的执行函数中调用postMessage()
函数发出信号、数据时,也会产生消息事件。 可以用eventLoop()
函数收到消息通知。
{@fun/Threads/Thread/peekMessage peekMessage}, {@fun/Threads/Thread/join join}, {@fun/Threads/Thread/terminate terminate}, {@fun/Threads/Thread/getData getData}, {@fun/Threads/Thread/setData setData}, {@fun/Threads/Thread/id id}, {@fun/Threads/Thread/name name}, {@fun/Threads/Thread/eventLoop eventLoop}
join()
函数用于等待线程退出,并回收系统资源。
ThreadRet
对象包含执行结果的相关数据,包含的属性如下:
ThreadRet
对象
join() join(timeout)
timeout
参数用于设置等待线程结束的超时时间,单位为毫秒。timeout
参数设置为0或者不设置timeout
参数时join()
函数会阻塞,等待到线程执行结束。timeout
参数设置为-1时,join()
函数会立即返回。
timeout false number
function main() {
var t1 = threading.Thread(function() {
Log("Hello thread1")
Sleep(5000)
})
var ret = t1.join(1000)
Log("ret:", ret) // ret: undefined
ret = t1.join()
Log("ret:", ret) // ret: {"id":1,"terminated":false,"elapsed":5003252000}
}
测试join()
函数超时,输出返回值。
join()
函数超时,返回undefined
。
{@fun/Threads/Thread/peekMessage peekMessage}, {@fun/Threads/Thread/postMessage postMessage}, {@fun/Threads/Thread/terminate terminate}, {@fun/Threads/Thread/getData getData}, {@fun/Threads/Thread/setData setData}, {@fun/Threads/Thread/id id}, {@fun/Threads/Thread/name name}, {@fun/Threads/Thread/eventLoop eventLoop}
terminate()
函数用于强制结束线程,释放创建线程使用的硬件资源。
terminate()
function main() {
var t1 = threading.Thread(function() {
for (var i = 0; i < 10; i++) {
Log("thread1 i:", i)
Sleep(1000)
}
})
Sleep(3000)
t1.terminate()
Log("after t1.terminate()")
while (true) {
LogStatus(_D())
Sleep(1000)
}
}
强制终止一个线程的执行,在强制终止一个线程之后,日志中不再有这个线程输出的内容。
对于terminate()
函数强制结束的线程,无法再使用join()
函数等待结束。
{@fun/Threads/Thread/peekMessage peekMessage}, {@fun/Threads/Thread/postMessage postMessage}, {@fun/Threads/Thread/join join}, {@fun/Threads/Thread/getData getData}, {@fun/Threads/Thread/setData setData}, {@fun/Threads/Thread/id id}, {@fun/Threads/Thread/name name}, {@fun/Threads/Thread/eventLoop eventLoop}
getData()
函数用于访问线程环境中记录的变量。数据在线程没有被执行join()
函数(等待退出成功)并且没有被执行terminate()
函数(强制终止线程)的情况下有效。
getData()
函数返回当前线程环境中储存的键-值对中key
参数对应的键值。
string、number、bool、object、array、空值等系统支持的所有类型
getData() getData(key)
key
参数为储存的键-值对的键名。
key true string
function main() {
var t1 = threading.Thread(function() {
for (var i = 0; i < 5; i++) {
threading.currentThread().setData("count", i)
Log(`setData("count"):`, i)
Sleep(1000)
}
})
for (var i = 0; i < 5; i++) {
var count = threading.getThread(t1.id()).getData("count")
Log(`getData("count"):`, count)
Sleep(1000)
}
t1.join()
}
在并发线程的环境中记录键名为count
的值,然后在主线程中读取count
的键值。
{@fun/Threads/Thread/peekMessage peekMessage}, {@fun/Threads/Thread/postMessage postMessage}, {@fun/Threads/Thread/join join}, {@fun/Threads/Thread/terminate terminate}, {@fun/Threads/Thread/setData setData}, {@fun/Threads/Thread/id id}, {@fun/Threads/Thread/name name}, {@fun/Threads/Thread/eventLoop eventLoop}
setData()
函数用于在线程环境储存变量。
setData(key, value)
key
参数用于指定储存的键-值对的键名。
key
true
string
value
参数用于指定储存的键-值对的键值。
value true string、number、bool、object、array、空值等系统支持的任意类型
function main() {
var t1 = threading.Thread(function() {
threading.currentThread().setData("data", 100)
})
Sleep(1000)
Log(`t1.getData("data"):`, t1.getData("data"))
t1.join()
}
并发的线程中设置键值对,主线程中读取这个键值对。
数据在线程没有被执行join()
函数(等待退出成功)并且没有被执行terminate()
函数(强制终止线程)的情况下有效。参数value
的值必须是可序列化的变量。
{@fun/Threads/Thread/peekMessage peekMessage}, {@fun/Threads/Thread/postMessage postMessage}, {@fun/Threads/Thread/join join}, {@fun/Threads/Thread/terminate terminate}, {@fun/Threads/Thread/getData getData}, {@fun/Threads/Thread/id id}, {@fun/Threads/Thread/name name}, {@fun/Threads/Thread/eventLoop eventLoop}
id()
函数用于返回当前多线程对象实例的threadId
。
id()
函数的返回值为threadId
。
number
id()
function main() {
var t1 = threading.Thread(function() {
threading.currentThread().setData("data", 100)
})
Log(`t1.id():`, t1.id())
t1.join()
}
创建一个并发运行的线程,在主线程输出这个并发线程的threadId
。
{@fun/Threads/Thread/peekMessage peekMessage}, {@fun/Threads/Thread/postMessage postMessage}, {@fun/Threads/Thread/join join}, {@fun/Threads/Thread/terminate terminate}, {@fun/Threads/Thread/getData getData}, {@fun/Threads/Thread/setData setData}, {@fun/Threads/Thread/name name}, {@fun/Threads/Thread/eventLoop eventLoop}
name()
函数用于返回当前多线程对象实例的名称。
name()
函数返回值为并发的线程名称。
string
name()
function main() {
var t1 = threading.Thread(function() {
threading.currentThread().setData("data", 100)
})
Log(`t1.name():`, t1.name()) // t1.name(): Thread-1
t1.join()
}
创建一个并发运行的线程,在主线程输出这个并发线程的名称。
{@fun/Threads/Thread/peekMessage peekMessage}, {@fun/Threads/Thread/postMessage postMessage}, {@fun/Threads/Thread/join join}, {@fun/Threads/Thread/terminate terminate}, {@fun/Threads/Thread/getData getData}, {@fun/Threads/Thread/setData setData}, {@fun/Threads/Thread/id id}, {@fun/Threads/Thread/eventLoop eventLoop}
eventLoop()
函数用于监听线程收到的事件。
eventLoop()
函数返回当前线程收到的事件信息,查看事件信息结构。
object、空值
eventLoop() eventLoop(timeout)
参数timeout
为超时设置,单位为毫秒。参数timeout
如果设置为0则等待有事件发生才返回,如果大于0就是设置事件等待超时,小于0立即返回最近事件。
timeout false number
function main() {
var t1 = threading.Thread(function() {
while (true) {
var eventMsg = threading.currentThread().eventLoop() // 阻塞等待
// 2024-11-14 10:14:18 thread1 eventMsg: {"Seq":1,"Event":"thread","ThreadId":0,"Index":1,"Queue":0,"Nano":1731550458699947000}
Log(_D(), "thread1 eventMsg:", eventMsg)
}
})
var t2 = threading.Thread(function() {
while (true) {
var eventMsg = threading.currentThread().eventLoop(-1) // 立即返回
Log(_D(), "thread2 eventMsg:", eventMsg)
Sleep(5000)
}
})
var t3 = threading.Thread(function() {
while (true) {
var eventMsg = threading.currentThread().eventLoop(3000) // 设置3秒超时
Log(_D(), "thread3 eventMsg:", eventMsg)
}
})
t1.postMessage("Hello ", t1.name())
t2.postMessage("Hello ", t2.name())
t3.postMessage("Hello ", t3.name())
t1.join()
t2.join()
t3.join()
}
并发执行3个线程,输出接收到的事件信息,超时或者立即返回时输出的是空值。
eventLoop()
函数的处理机制与全局函数EventLoop()
一致。
{@fun/Threads/Thread/peekMessage peekMessage}, {@fun/Threads/Thread/postMessage postMessage}, {@fun/Threads/Thread/join join}, {@fun/Threads/Thread/terminate terminate}, {@fun/Threads/Thread/getData getData}, {@fun/Threads/Thread/setData setData}, {@fun/Threads/Thread/id id}, {@fun/Threads/Thread/name name}
线程锁对象,用于多线程同步处理。
acquire()
函数用于请求一个线程锁(加锁)。
acquire()
范例可以参考threading.Lock()
章节的内容。
acquire()
函数用于请求一个线程锁。当一个线程调用某个线程锁对象的acquire()
函数时,它会尝试获取锁,如果锁当前没有被其它线程占用,调用线程就会成功获得锁并继续执行。如果锁已经被其它线程持有,调用acquire()
的线程会被阻塞,直到锁被释放。
{@fun/Threads/threading/Lock Lock}, {@fun/Threads/ThreadLock/release release}
release()
函数用于释放一个线程锁(解锁)。
release()
function consumer(productionQuantity, dict, pLock, cLock) {
for (var i = 0; i < productionQuantity; i++) {
pLock.acquire()
cLock.acquire()
var arr = dict.get("array")
var count = arr.shift()
dict.set("array", arr)
Log("consumer:", count, ", array:", arr)
cLock.release()
Sleep(1000)
pLock.release()
}
}
function producer(productionQuantity, dict, pLock, cLock) {
for (var i = 0; i < productionQuantity; i++) {
cLock.acquire() // cLock.acquire() 放在 pLock.acquire() 后不会产生死锁
pLock.acquire()
var arr = dict.get("array")
arr.push(i)
dict.set("array", arr)
Log("producer:", i, ", array:", arr)
pLock.release()
Sleep(1000)
cLock.release()
}
}
function main() {
var dict = threading.Dict()
dict.set("array", [])
var pLock = threading.Lock()
var cLock = threading.Lock()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, pLock, cLock)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, pLock, cLock)
consumerThread.join()
producerThread.join()
}
测试死锁场景
需要注意,线程锁使用不当可能导致死锁。
{@fun/Threads/threading/Lock Lock}, {@fun/Threads/ThreadLock/acquire acquire}
事件对象,用于多线程事件通知、信号。
set()
函数用于通知事件(设置信号)。
set()
可以参考threading.Event()
章节范例。
如果已经set()
设置过,不能重复设置,需要清空之后重新设置信号。
{@fun/Threads/ThreadEvent/clear clear}, {@fun/Threads/ThreadEvent/wait wait}, {@fun/Threads/ThreadEvent/isSet isSet}
clear()
函数用于清理信号。
clear()
可以参考threading.Event()
章节范例。
{@fun/Threads/ThreadEvent/set set}, {@fun/Threads/ThreadEvent/wait wait}, {@fun/Threads/ThreadEvent/isSet isSet}
wait()
函数用于设置事件(信号)等待,在事件(信号)被设置之前会阻塞;支持设置超时参数。
wait()
函数返回是否超时,如果超时返回真值。
bool
wait() wait(timeout)
参数timeout
用于设置等待超时,单位毫秒。
timeout false number
function main() {
var event = threading.Event()
var t1 = threading.Thread(function(event) {
var ret = event.wait(100)
Log(`event.wait(100):`, ret)
ret = event.wait()
Log(`event.wait():`, ret)
}, event)
Sleep(1000)
event.set()
t1.join()
}
测试wait()
函数的返回值。
{@fun/Threads/ThreadEvent/set set}, {@fun/Threads/ThreadEvent/clear clear}, {@fun/Threads/ThreadEvent/isSet isSet}
isSet()
函数用于判断是否已经设置了事件(信号)。
isSet()
函数返回是否已经设置了事件(信号);如果当前已经设置过事件(信号)则返回真值。
bool
isSet()
可以参考threading.Event()
章节范例。
{@fun/Threads/ThreadEvent/set set}, {@fun/Threads/ThreadEvent/clear clear}, {@fun/Threads/ThreadEvent/wait wait}
条件对象,用于多线程同步。
notify()
函数用于唤醒一个正在等待的线程(如果有的话)。只有调用了wait()
方法的线程才会被唤醒。
notify()
function consumer(dict, condition) {
while (true) {
condition.acquire()
while (dict.get("array").length == 0) {
Log(threading.currentThread().name(), "wait()...", ", array:", dict.get("array"))
condition.wait()
}
var arr = dict.get("array")
var num = arr.shift()
Log(threading.currentThread().name(), ", num:", num, ", array:", arr, "#FF0000")
dict.set("array", arr)
Sleep(1000)
condition.release()
}
}
function main() {
var condition = threading.Condition()
var dict = threading.Dict()
dict.set("array", [])
var t1 = threading.Thread(consumer, dict, condition)
var t2 = threading.Thread(consumer, dict, condition)
var t3 = threading.Thread(consumer, dict, condition)
Sleep(1000)
var i = 0
while (true) {
condition.acquire()
var msg = ""
var arr = dict.get("array")
var randomNum = Math.floor(Math.random() * 5) + 1
if (arr.length >= 3) {
condition.notifyAll()
msg = "notifyAll"
} else {
arr.push(i)
dict.set("array", arr)
if (randomNum > 3 && arr.length > 0) {
condition.notify()
msg = "notify"
} else {
msg = "pass"
}
i++
}
Log(_D(), "randomNum:", randomNum, ", array:", arr, ", msg:", msg)
condition.release()
Sleep(1000)
}
}
使用notify()
函数唤醒等待中的线程。
notify()
函数会唤醒处于等待队列中的一个线程。
notify()
函数唤醒一个线程时,这个线程会重新获取线程锁。
{@fun/Threads/ThreadCondition/notifyAll notifyAll}, {@fun/Threads/ThreadCondition/wait wait}, {@fun/Threads/ThreadCondition/acquire acquire}, {@fun/Threads/ThreadCondition/release release}
notifyAll()
函数会唤醒所有正在等待的线程。
notifyAll()
范例可以参考ThreadCondition.notify()
章节的内容。
notifyAll()
函数逐个唤醒所有等待中的线程,被唤醒的线程重新获取线程锁。
{@fun/Threads/ThreadCondition/notify notify}, {@fun/Threads/ThreadCondition/wait wait}, {@fun/Threads/ThreadCondition/acquire acquire}, {@fun/Threads/ThreadCondition/release release}
wait()
函数用于在某些设计的条件下让线程等待。
wait()
范例可以参考ThreadCondition.notify()
章节的内容。
wait()
函数会释放线程锁,当被唤醒时重新获取线程锁。
{@fun/Threads/ThreadCondition/notify notify}, {@fun/Threads/ThreadCondition/notifyAll notifyAll}, {@fun/Threads/ThreadCondition/acquire acquire}, {@fun/Threads/ThreadCondition/release release}
acquire()
函数用于请求一个线程锁(加锁)。
acquire()
范例可以参考ThreadCondition.notify()
章节的内容。
在使用wait()
之前需要请求当前条件对象的线程锁(加锁)。
{@fun/Threads/ThreadCondition/notify notify}, {@fun/Threads/ThreadCondition/notifyAll notifyAll}, {@fun/Threads/ThreadCondition/wait wait}, {@fun/Threads/ThreadCondition/release release}
release()
函数用于释放一个线程锁(解锁)。
release()
范例可以参考ThreadCondition.notify()
章节的内容。
在使用wait()
之后需要释放当前条件对象的线程锁(解锁)。
{@fun/Threads/ThreadCondition/notify notify}, {@fun/Threads/ThreadCondition/notifyAll notifyAll}, {@fun/Threads/ThreadCondition/wait wait}, {@fun/Threads/ThreadCondition/acquire acquire}
字典对象,用于数据共享。
get()
函数用于获取字典对象中记录的键值。
get()
函数返回通过参数key
指定的键值。
string、number、bool、object、array、空值等系统支持的所有类型
get(key)
参数key
用于指定要获取的键对应的键名。
key true string
function main() {
var event = threading.Event()
var dict = threading.Dict()
dict.set("data", 100)
var t1 = threading.Thread(function(dict, event) {
Log(`thread1, dict.get("data"):`, dict.get("data"))
event.set()
event.clear()
event.wait()
Log(`after main change data, thread1 dict.get("data"):`, dict.get("data"))
dict.set("data", 0)
}, dict, event)
event.wait()
dict.set("data", 99)
event.set()
event.clear()
t1.join()
Log(`main thread, dict.get("data"):`, dict.get("data"))
}
使用事件对象通知线程读取、修改数据。
{@fun/Threads/ThreadDict/set set}
set()
函数用于设置键值对。
set(key, value)
参数key
用于设置需要修改的键名。
key
true
string
参数value
用于设置需要修改的键值。
value true string、number、bool、object、array、空值等系统支持的所有类型
范例可以参考ThreadDict.get()
章节的内容。
{@fun/Threads/ThreadDict/get get}
Futures TA