luanhailiang 2023-09-25 14:59:37 +08:00
parent ca1333bafb
commit 27b6575ec9
2 changed files with 120 additions and 5 deletions

6
app.py
View File

@ -79,9 +79,9 @@ def worker(id,source,region=None,stream=False):
print("start video stream... ", fps, delay) print("start video stream... ", fps, delay)
while True: while True:
use = datetime.now().strftime('%Y-%m-%d %H:%M:%S') use = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if bgn != 0 and delay != 0 and time.time()*1000 - bgn < delay: if bgn != 0 and delay != 0 and time.time()*1000 - bgn < 2*delay:
print("sleep",delay - (time.time()*1000 - bgn)) print("sleep",2*delay - (time.time()*1000 - bgn))
time.sleep((delay - (time.time()*1000 - bgn))/1000) time.sleep((2*delay - (time.time()*1000 - bgn))/1000)
bgn = (time.time()*1000) bgn = (time.time()*1000)
ret, frame = cap.read() ret, frame = cap.read()
del(ret) del(ret)

119
show.py
View File

@ -24,17 +24,132 @@ def show():
text += f"| {i} : {ball} " text += f"| {i} : {ball} "
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),text) print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),text)
return 'ok' return 'ok'
import re
@app.route('/mj',methods=['POST']) @app.route('/mj',methods=['POST'])
def mj(): def mj():
print(request.json) print(request.json)
return 'ok' return 'ok'
@app.route('/cq',methods=['POST']) @app.route('/cq',methods=['POST'])
def cq(): def cq():
print(request.json) # print(request.json)
if request.json["post_type"] != "message":
return 'ok'
if request.json["message_type"] != "group":
return 'ok'
if request.json["group_id"] != "116277572":
return 'ok'
print(request.json['raw_message'])
# 正则表达式来匹配URL
url_pattern = re.compile(r'url=(.*?)\]')
# 使用findall来找到所有的URL
urls = re.findall(url_pattern, request.json['raw_message'])
base64Array = []
# 输出找到的URL
for url in urls:
print(url)
# 从URL下载图片
response = requests.get(url)
response.raise_for_status() # 如果请求失败,此处将引发异常
# 将图片数据转换为Base64编码的字节数组
image_base64_bytes = base64.b64encode(response.content)
# 如果您需要将其转换为字符串,您可以使用.decode('utf-8')将字节数组转换为字符串
image_base64_str = image_base64_bytes.decode('utf-8')
# 打印Base64编码的字节数组
# print(image_base64_bytes)
base64Array.append('data:image/jpeg;base64,'+image_base64_str)
if len(base64Array) == 0:
return 'ok'
# 示例调用
params = {
'base64Array':base64Array,
'notifyHook': 'http://hk.luanhailiang.cn:5000/mj',
'prompt': 'game assets, game ui, animal, plant, cute, soft, baby, magical, forest, spirit, fairy',
'state': ''
}
response = imagine(params)
print(response.json())
return 'ok' return 'ok'
import base64
import requests
def imagine(params):
"""
提交Imagine任务
:param params: imagineDTO一个字典包含以下键:
- base64Array: 垫图base64数组
- notifyHook: 回调地址, 为空时使用全局notifyHook
- prompt: 提示词
- state: 自定义参数
:return: 返回服务器的响应
"""
# API端点
url = 'http://127.0.0.1:8080/mj/submit/imagine'
# 发送POST请求
response = requests.post(url, json=params)
# 如果需要,处理响应(例如,检查响应状态,解析响应主体等)
return response
if __name__ == '__main__': if __name__ == '__main__':
# # 这是包含URL的示例文本
# text = '[CQ:image,file=42c93be3dbe10bfa79ad93ce1baed561.image,subType=1,url=https://gchat.qpic.cn/gchatpic_new/1183464602/691534145-2408237736-42C93BE3DBE10BFA79AD93CE1BAED561/0?term=2&amp;is_origin=0][CQ:image,file=42c93be3dbe10bfa79ad93ce1baed561.image,subType=1,url=https://gchat.qpic.cn/gchatpic_new/1183464602/691534145-2408237736-42C93BE3DBE10BFA79AD93CE1BAED561/0?term=2&amp;is_origin=0]'
# # 正则表达式来匹配URL
# url_pattern = re.compile(r'url=(.*?)\]')
# # 使用findall来找到所有的URL
# urls = re.findall(url_pattern, text)
# # 输出找到的URL
# for url in urls:
# print(url)
# # 从URL下载图片
# response = requests.get(url)
# response.raise_for_status() # 如果请求失败,此处将引发异常
# # 将图片数据转换为Base64编码的字节数组
# image_base64_bytes = base64.b64encode(response.content)
# # 如果您需要将其转换为字符串,您可以使用.decode('utf-8')将字节数组转换为字符串
# image_base64_str = image_base64_bytes.decode('utf-8')
# # 打印Base64编码的字节数组
# # print(image_base64_bytes)
# # 示例调用
# params = {
# 'base64Array': ['data:image/jpeg;base64,'+image_base64_str],
# 'notifyHook': 'http://hk.luanhailiang.cn:5000/mj',
# 'prompt': 'character design, game assets, game ui, cute, soft',
# 'state': ''
# }
# response = imagine(params)
# print(response.json())
app.logger.setLevel(logging.ERROR) app.logger.setLevel(logging.ERROR)
app.run("0.0.0.0",port=8000,threaded=True) app.run("0.0.0.0",port=8000,threaded=True)