import cv2 import json import time import requests import supervision as sv from datetime import datetime from ultralytics import YOLO from flask import Flask,request,Response from multiprocessing import Process, Event app = Flask(__name__) workers = {} @app.route('/start/') def start(id): table = workers.get(id) if table is not None: table.set() source = request.args.get('source') target = request.args.get('target') region = request.args.get('region') if target is None: return "please input target url" return Response(worker(id,source,region,True), mimetype='text/event-stream') stop_event = Event() p = Process(target=workerloop, args=(stop_event,id,source,target,region)) p.start() workers[id] = stop_event return 'ok' @app.route('/stop/') def stop(id): table = workers.get(id) if table is not None: table.set() return 'ok' @app.route('/show',methods=['POST']) def show(): # print(request.json) data = request.json["content"] balls = data["balls"] table = data["table"] text = f"table:{table:>3} " for i in range(16): ball = "#" if "ball"+str(i) in balls else " " text += f"| {i} : {ball} " print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),text) return 'ok' def worker(id,source,region=None,stream=False): if region == None: region = 10 else: region = int(region) model = YOLO('./best.pt') balls = {} count = 0 print(source) try: cap = cv2.VideoCapture(source) if not cap.isOpened(): print("Error opening video stream.") while True: ret, frame = cap.read() if not ret: print("Error read video stream.") break result = model.track(frame,show=False,stream=False,persist=True,device=int(id)%4) result = result[0] detections = sv.Detections.from_yolov8(result) if result.boxes.id is not None: detections.tracker_id = result.boxes.id.cpu().numpy().astype(int) detections = detections[(detections.tracker_id != None)] count += 1 names = {} for xyxy,_, confidence, class_id, tracker_id in detections: name = model.model.names[class_id] if name in names: print("Name duplicate",name) continue ball = balls.get(name,{}) ball["tkid"] = int(tracker_id) ball["conf"] = round(float(confidence), 2) ball["xyxy"] = [int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])] ball["count"] = ball.get("count",0) + 1 if region - count < 3: ball["final"] = ball.get("final",0) + 1 balls[name] = ball names[name] = True names = {} if count < region: continue json_data = json.dumps({"table":id,"balls":balls,"time":int(time.time()*1000)}) balls = {} count = 0 if not stream: yield json_data else: yield f"data: {json_data}\n\n" except GeneratorExit: print("Client disconnected at", time.ctime()) finally:cap.release() def workerloop(stop_event,id,source,target=None,region=None): try: gen = worker(id,source,region) for data in gen: if stop_event.is_set(): break json_data = json.loads(data) # 设置请求头 headers = { "Content-Type": "application/json" } text = json.dumps({"content":json_data}) print(text) response = requests.post(target, data=text, headers=headers) print(response.text) finally: gen.close() if __name__ == '__main__': app.run("0.0.0.0",threaded=True)