billiard/app.py

204 lines
6.9 KiB
Python

import cv2
import json
import time
import torch
import requests
import threading
import numpy as np
import supervision as sv
from datetime import datetime
from ultralytics import YOLO
from flask import Flask,request,Response
from multiprocessing import Process, Event
app = Flask(__name__)
workers = {}
@app.route('/start/<id>')
def start(id):
table = workers.get(id)
if table is not None:
table.set()
source = request.args.get('source')
target = request.args.get('target')
region = request.args.get('region')
if target is None:
return "please input target url"
return Response(worker(id,source,region,True), mimetype='text/event-stream')
stop_event = Event()
p = Process(target=workerloop, args=(stop_event,id,source,target,region))
p.start()
workers[id] = stop_event
return 'ok'
@app.route('/stop/<id>')
def stop(id):
table = workers.get(id)
if table is not None:
table.set()
return 'ok'
@app.route('/show',methods=['POST'])
def show():
# print(request.json)
data = request.json["content"]
info(data)
return 'ok'
def info(data):
balls = data["balls"]
table = data["table"]
text = f"table:{table:>3} "
for i in range(16):
ball = "#" if "ball"+str(i) in balls else " "
text += f"| {i} : {ball} "
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),text)
def worker(id,source,region=None,stream=False):
if region == None:
region = 10
else:
region = int(region)
print("start loading model...",id,source,region)
model = YOLO('./best.pt')
print("start loaded model!!!")
balls = {}
count = 0
delay = 0
bgn = 0
try:
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print("Error opening video stream.")
if source.endswith(".mp4"):
fps = cap.get(cv2.CAP_PROP_FPS)
delay = 1000.0 / fps
print("start video stream... ", fps, delay)
while True:
use = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if bgn != 0 and delay != 0 and time.time()*1000 - bgn < 2*delay:
print("sleep",2*delay - (time.time()*1000 - bgn))
time.sleep((2*delay - (time.time()*1000 - bgn))/1000)
bgn = (time.time()*1000)
ret, frame = cap.read()
del(ret)
del(frame)
ret, frame = cap.read()
use += " read:"+str((time.time()*1000) - bgn)
if not ret:
print("Error read video stream.")
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print("Error opening video stream..")
cap = cv2.VideoCapture(source)
time.sleep(1)
if not cap.isOpened():
print("Error opening video stream...")
time.sleep(3)
if not cap.isOpened():
print("Error opening video stream....")
break
continue
result = model.track(frame,device=int(id)%4,tracker='botsort.yaml')
use += " track:"+str((time.time()*1000) - bgn)
del(ret)
del(frame)
result = result[0]
detections = sv.Detections.from_yolov8(result)
if result.boxes.id is not None:
detections.tracker_id = result.boxes.id.cpu().numpy().astype(int)
else:
detections.tracker_id = np.array([])
detections.conf = np.array([])
detections.xyxy=np.empty((0, 4), dtype=np.float32)
# detections = detections[(detections.tracker_id != None)]
count += 1
names = {}
for xyxy,_, confidence, class_id, tracker_id in detections:
name = model.model.names[class_id]
if name in names:
print("Name duplicate",name)
continue
ball = balls.get(name,{})
ball["tkid"] = int(tracker_id)
ball["conf"] = round(float(confidence), 2)
ball["xyxy"] = [int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])]
ball["count"] = ball.get("count",0) + 1
if region - count < 3:
ball["final"] = ball.get("final",0) + 1
balls[name] = ball
names[name] = True
names = {}
use += " names:"+str((time.time()*1000) - bgn)
if count < region:
continue
data = {"table":id,"balls":balls,"time":int(time.time()*1000)}
info(data)
json_data = json.dumps(data)
balls = {}
count = 0
use += " dump:"+str((time.time()*1000) - bgn)
if not stream:
yield json_data
else:
yield f"data: {json_data}\n\n"
use += " yield:"+str((time.time()*1000) - bgn)
print("model.track",use)
except GeneratorExit:
print("Client disconnected at", time.ctime())
finally:cap.release()
def workerloop(stop_event,id,source,target=None,region=None):
try:
gen = worker(id,source,region)
for data in gen:
use = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
bgn = (time.time()*1000)
if stop_event.is_set():
break
json_data = json.loads(data)
use += " loads:"+str((time.time()*1000) - bgn)
text = json.dumps({"content":json_data})
use += " dumps:"+str((time.time()*1000) - bgn)
# 创建并启动线程
thread = threading.Thread(target=post_request, args=(target, text))
thread.start()
use += " post:"+str((time.time()*1000) - bgn)
print("workerloop",use)
finally:
gen.close()
def post_request(url, data):
use = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
bgn = (time.time()*1000)
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, data=data,headers=headers)
use += " post:"+str((time.time()*1000) - bgn ) +"ms"
print("post_request",use,response.text)
# import pstats
# import cProfile
# def test_function():
# url = "rtmp://rtmp03open.ys7.com:1935/v3/openlive/L05874022_1_1?expire=1726389273&id=625006970167889920&t=2c7f371063e4ad672cb7a7c34b2a236e9ff37b02052f818c9c4fe7ae9632cd0e&ev=100"
# gen = worker(0,url,5)
# count = 0
# for data in gen:
# count += 1
# if count > 10:
# break
if __name__ == '__main__':
# cProfile.run('test_function()', 'test_function.profile')
# p = pstats.Stats('test_function.profile')
# p.sort_stats('cumulative').print_stats(100) # Top 10 by cumulative time
app.run("0.0.0.0",threaded=True)