billiard/app.py

116 lines
3.4 KiB
Python

import cv2
import json
import time
import requests
import supervision as sv
from ultralytics import YOLO
from flask import Flask,request,Response
from multiprocessing import Process, Event
app = Flask(__name__)
workers = {}
@app.route('/start/<id>')
def start(id):
table = workers.get(id)
if table is not None:
table.set()
source = request.args.get('source')
target = request.args.get('target')
region = request.args.get('region')
if target is None:
return "please input target url"
return Response(worker(id,source,region,True), mimetype='text/event-stream')
stop_event = Event()
p = Process(target=workerloop, args=(stop_event,id,source,target,region))
p.start()
workers[id] = stop_event
return 'ok'
@app.route('/stop/<id>')
def stop(id):
table = workers.get(id)
if table is not None:
table.set()
return 'ok'
@app.route('/show',methods=['POST'])
def show():
print(request.data)
return 'ok'
def worker(id,source,region=None,stream=False):
if region == None:
region = 10
else:
region = int(region)
model = YOLO('./best.pt')
balls = {}
count = 0
print(source)
try:
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print("Error opening video stream.")
while True:
ret, frame = cap.read()
if not ret:
print("Error read video stream.")
break
result = model.track(frame,show=False,stream=False,persist=True,device=int(id)%4)
result = result[0]
detections = sv.Detections.from_yolov8(result)
if result.boxes.id is not None:
detections.tracker_id = result.boxes.id.cpu().numpy().astype(int)
detections = detections[(detections.tracker_id != None)]
count += 1
for xyxy,_, confidence, class_id, tracker_id in detections:
ball = balls.get(model.model.names[class_id],{})
ball["tkid"] = int(tracker_id)
ball["conf"] = round(float(confidence), 2)
ball["xyxy"] = [int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])]
ball["count"] = ball.get("count",0) + 1
if region - count < 3:
ball["final"] = ball.get("final",0) + 1
balls[model.model.names[class_id]] = ball
if count < region:
continue
json_data = json.dumps({"table":id,"balls":balls,"time":int(time.time()*1000)})
balls = {}
count = 0
if not stream:
yield json_data
else:
yield f"data: {json_data}\n\n"
except GeneratorExit:
print("Client disconnected at", time.ctime())
finally:cap.release()
def workerloop(stop_event,id,source,target=None,region=None):
try:
gen = worker(id,source,region)
for data in gen:
if stop_event.is_set():
break
json_data = json.loads(data)
# 设置请求头
headers = {
"Content-Type": "application/json"
}
text = json.dumps({"content":json_data})
print(text)
response = requests.post(target, data=text, headers=headers)
print(response.text)
finally:
gen.close()
if __name__ == '__main__':
app.run("0.0.0.0",threaded=True)