173 lines
5.6 KiB
Python
173 lines
5.6 KiB
Python
import cv2
|
|
import json
|
|
import time
|
|
import requests
|
|
import threading
|
|
import numpy as np
|
|
import supervision as sv
|
|
|
|
from datetime import datetime
|
|
from ultralytics import YOLO
|
|
from flask import Flask,request,Response
|
|
from multiprocessing import Process, Event
|
|
|
|
app = Flask(__name__)
|
|
|
|
workers = {}
|
|
|
|
@app.route('/start/<id>')
|
|
def start(id):
|
|
table = workers.get(id)
|
|
if table is not None:
|
|
table.set()
|
|
source = request.args.get('source')
|
|
target = request.args.get('target')
|
|
region = request.args.get('region')
|
|
if target is None:
|
|
return "please input target url"
|
|
return Response(worker(id,source,region,True), mimetype='text/event-stream')
|
|
|
|
stop_event = Event()
|
|
p = Process(target=workerloop, args=(stop_event,id,source,target,region))
|
|
p.start()
|
|
workers[id] = stop_event
|
|
return 'ok'
|
|
|
|
@app.route('/stop/<id>')
|
|
def stop(id):
|
|
table = workers.get(id)
|
|
if table is not None:
|
|
table.set()
|
|
return 'ok'
|
|
|
|
@app.route('/show',methods=['POST'])
|
|
def show():
|
|
# print(request.json)
|
|
data = request.json["content"]
|
|
info(data)
|
|
return 'ok'
|
|
|
|
def info(data):
|
|
balls = data["balls"]
|
|
table = data["table"]
|
|
text = f"table:{table:>3} "
|
|
for i in range(16):
|
|
ball = "#" if "ball"+str(i) in balls else " "
|
|
text += f"| {i} : {ball} "
|
|
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),text)
|
|
|
|
def worker(id,source,region=None,stream=False):
|
|
if region == None:
|
|
region = 10
|
|
else:
|
|
region = int(region)
|
|
model = YOLO('./best.pt')
|
|
balls = {}
|
|
count = 0
|
|
print(source)
|
|
try:
|
|
cap = cv2.VideoCapture(source)
|
|
if not cap.isOpened():
|
|
print("Error opening video stream.")
|
|
while True:
|
|
use = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
bgn = (time.time()*1000)
|
|
ret, frame = cap.read()
|
|
use += " read:"+str((time.time()*1000) - bgn)
|
|
if not ret:
|
|
print("Error read video stream.")
|
|
cap = cv2.VideoCapture(source)
|
|
if not cap.isOpened():
|
|
print("Error opening video stream..")
|
|
cap = cv2.VideoCapture(source)
|
|
time.sleep(1)
|
|
if not cap.isOpened():
|
|
print("Error opening video stream...")
|
|
time.sleep(3)
|
|
if not cap.isOpened():
|
|
print("Error opening video stream....")
|
|
break
|
|
continue
|
|
result = model.track(frame,show=False,stream=False,persist=True,device=int(id)%4)
|
|
use += " track:"+str((time.time()*1000) - bgn)
|
|
del(ret)
|
|
del(frame)
|
|
result = result[0]
|
|
detections = sv.Detections.from_yolov8(result)
|
|
if result.boxes.id is not None:
|
|
detections.tracker_id = result.boxes.id.cpu().numpy().astype(int)
|
|
else:
|
|
detections.tracker_id = np.array([])
|
|
detections.conf = np.array([])
|
|
detections.xyxy=np.empty((0, 4), dtype=np.float32)
|
|
# detections = detections[(detections.tracker_id != None)]
|
|
|
|
count += 1
|
|
names = {}
|
|
for xyxy,_, confidence, class_id, tracker_id in detections:
|
|
name = model.model.names[class_id]
|
|
if name in names:
|
|
print("Name duplicate",name)
|
|
continue
|
|
ball = balls.get(name,{})
|
|
ball["tkid"] = int(tracker_id)
|
|
ball["conf"] = round(float(confidence), 2)
|
|
ball["xyxy"] = [int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])]
|
|
ball["count"] = ball.get("count",0) + 1
|
|
if region - count < 3:
|
|
ball["final"] = ball.get("final",0) + 1
|
|
balls[name] = ball
|
|
names[name] = True
|
|
names = {}
|
|
use += " names:"+str((time.time()*1000) - bgn)
|
|
if count < region:
|
|
continue
|
|
data = {"table":id,"balls":balls,"time":int(time.time()*1000)}
|
|
info(data)
|
|
json_data = json.dumps(data)
|
|
balls = {}
|
|
count = 0
|
|
use += " dump:"+str((time.time()*1000) - bgn)
|
|
if not stream:
|
|
yield json_data
|
|
else:
|
|
yield f"data: {json_data}\n\n"
|
|
use += " yield:"+str((time.time()*1000) - bgn)
|
|
print(use)
|
|
except GeneratorExit:
|
|
print("Client disconnected at", time.ctime())
|
|
|
|
finally:cap.release()
|
|
|
|
def workerloop(stop_event,id,source,target=None,region=None):
|
|
try:
|
|
gen = worker(id,source,region)
|
|
for data in gen:
|
|
use = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
bgn = (time.time()*1000)
|
|
if stop_event.is_set():
|
|
break
|
|
json_data = json.loads(data)
|
|
use += " loads:"+str((time.time()*1000) - bgn)
|
|
text = json.dumps({"content":json_data})
|
|
use += " dumps:"+str((time.time()*1000) - bgn)
|
|
|
|
# 创建并启动线程
|
|
thread = threading.Thread(target=post_request, args=(target, text))
|
|
thread.start()
|
|
|
|
use += " post:"+str((time.time()*1000) - bgn)
|
|
print("gen",use)
|
|
finally:
|
|
gen.close()
|
|
|
|
def post_request(url, data):
|
|
headers = {
|
|
"Content-Type": "application/json"
|
|
}
|
|
response = requests.post(url, data=data,headers=headers)
|
|
print(response.text)
|
|
|
|
if __name__ == '__main__':
|
|
app.run("0.0.0.0",threaded=True)
|