现实交易中为了能及时了解优宽量化机器人交易状态,有时候我们需要将机器人所执行的交易结果发送到微信、邮箱、短信等等。但每天上百条各种各样的信息,使得对这些信息已经不敏感,导致重要的信息不能及时查收,所以本篇通过调用钉钉群接口实现机器人推送消息。
钉钉群机器人是一个高级扩展功能,只要有一个钉钉账号,就可以使用它。它可以将第三方信息聚合到钉钉群中,实现信息自动同步。支持Webhook协议的自定义接入,通过优宽量化机器人,将提醒、报警等信息聚合到钉钉群中。支持文本(text)、链接(link)、markdown三种消息格式,五种消息类型。同一条信息还可以同时发送至多个钉钉群。 参考官方链接:https://ding-doc.dingtalk.com/doc#/serverapi2/ye8tup
第1步:创建钉钉群 钉钉群每创建一个自定义机器人都会产生唯一的Hook地址,我们称为WebHook地址,通过向该WebHook地址推送消息,钉钉群就会收到消息。我们以PC端钉钉为例,首先点击左上方“+”号发起群聊,如果只想自己接受消息,可以随便拉两个人再踢出去,填写群名称:“优宽机器人”,群类型选择普通群即可。
第2步:添加钉钉群机器人 点击头像,选择机器人管理,然后选择自定义,点击添加。自定义机器人名字:“优宽”,添加到刚刚创建的钉钉群。机器人支持三种安全设置:
如果只用于提醒或报警,选择自定义关键词就可以了。在这里我们定义的关键词是“:”,也就是说当优宽量化机器人推送的信息中包含“:”时,这条信息才会推送到钉钉群中。然后点击同意协议完成。最后复制Webhook地址备用。
在获取到Webhook地址后,我们可以在优宽量化策略中向这个地址发起HTTP POST请求,就可以给这个钉钉群发送信息。需要注意的是,在发起POST请求时,必须将字符集编码设置成UTF-8。
import requests
import json
from datetime import datetime, timedelta, timezone
# 向钉钉群输出信息
def msg(text):
token ="0303627a118e739e628bcde104e19cf5463f61a4a127e4f2376e6a8aa1156ef1"
headers = {'Content-Type': 'application/json;charset=utf-8'} # 请求头
api_url = f"https://oapi.dingtalk.com/robot/send?access_token={token}"
json_text = {
"msgtype": "text", # 信息格式
"text": {
"content": text
}
}
# 发送并打印信息
Log(requests.post(api_url, json.dumps(json_text), headers=headers).content)
# 测试函数
def onTick():
arr = ['BTC', 'ETH', 'XRP', 'BCH', 'LTC'] # 主流数字货币
# 获取东八区时间
bj_dt = str(datetime.now().astimezone(timezone(timedelta(hours=8))))
bj_dt = bj_dt.split('.')[0] # 处理时间
text = f'{bj_dt}\n' # 定义信息内容
for i in arr: # 循环主流数字货币数组
exchange.IO("currency", f"{i}_USDT") # 切换交易对
ticker = exchange.GetTicker().Last # 获取最新价格
if i == 'LTC':
full = ' :'
else:
full = ':'
text = text + f"{i}/USDT{full}${ticker}\n" # 处理信息内容
msg(text) # 调用msg函数,输出信息
# 策略入口
def main():
while True: # 进入无线循环
onTick() # 执行onTick函数
Sleep(1000 * 60) # 休眠一分钟
自定义机器人在同步信息时,可以通过设置手机号码@多个群内成员。被@群成员在收到该信息时,会有@消息提醒,即使设置了免打扰会话仍然会通知提醒。
# 向钉钉群输出信息
def msg(text):
token = "0303627a118e739e628bcde104e19cf5463f61a4a127e4f2376e6a8aa1156ef1"
headers = {'Content-Type': 'application/json;charset=utf-8'} # 请求头
api_url = f"https://oapi.dingtalk.com/robot/send?access_token={token}"
json_text = {
"msgtype": "text", # 信息格式
"text": {
"content": text
},
"at": {
"atMobiles": [
"16666666666", # 被@的手机号码
"18888888888" # 被@的手机号码
],
"isAtAll": False # 不@所有人
}
}
# 发送并打印信息
Log(requests.post(api_url, json.dumps(json_text), headers=headers).content)
以上代码我们写了一个案例,每隔一分钟获取主流数字货币的价格,并且把这些信息推送到钉钉群中:
import requests import json from datetime import datetime, timedelta, timezone # 向钉钉群输出信息 def msg(text): token = "3e86657069f08b81f97fd558037d280ddca887cc3ae07a11bb28aed872dc746f" headers = {'Content-Type': 'application/json;charset=utf-8'} # 请求头 api_url = f"https://oapi.dingtalk.com/robot/send?access_token={token}" json_text = { "msgtype": "text", # 信息格式 "text": { "content": text }, "at": { "atMobiles": [ "16666666666", # 被@的手机号码 "18888888888" # 被@的手机号码 ], "isAtAll": False # 不@所有人 } } # 发送并打印信息 Log(requests.post(api_url, json.dumps(json_text), headers=headers).content) # 测试函数 def onTick(): arr = ['BTC', 'ETH', 'XRP', 'BCH', 'LTC'] # 主流数字货币 # 获取东八区时间 bj_dt = str(datetime.now().astimezone(timezone(timedelta(hours=8)))) bj_dt = bj_dt.split('.')[0] # 处理时间 text = f'{bj_dt}\n' # 定义信息内容 for i in arr: # 循环主流数字货币数组 exchange.IO("currency", f"{i}_USDT") # 切换交易对 ticker = exchange.GetTicker().Last # 获取最新价格 if i == 'LTC': full = ' :' else: full = ':' text = text + f"{i}/USDT{full}${ticker}\n" # 处理信息内容 msg(text) # 调用msg函数,输出信息 # 策略入口 def main(): while True: # 进入无线循环 onTick() # 执行onTick函数 Sleep(1000 * 60) # 休眠