import cv2 import json import time import requests import supervision as sv from ultralytics import YOLO from flask import Flask,request,Response from multiprocessing import Process, Event app = Flask(__name__) workers = {} @app.route('/start/') def start(id): table = workers.get(id) if table is not None: table.set() source = request.args.get('source') target = request.args.get('target') region = request.args.get('region') if target is None: return "please input target url" return Response(worker(id,source,region,True), mimetype='text/event-stream') stop_event = Event() p = Process(target=workerloop, args=(stop_event,id,source,target,region)) p.start() workers[id] = stop_event return 'ok' @app.route('/stop/') def stop(id): table = workers.get(id) if table is not None: table.set() return 'ok' @app.route('/show',methods=['POST']) def show(): print(request.data) return 'ok' def worker(id,source,region=None,stream=False): if region == None: region = 1000 else: region = int(region) model = YOLO('./best.pt') balls = {} timebegin = int(time.time()*1000) print(source) try: cap = cv2.VideoCapture(source) if not cap.isOpened(): print("Error opening video stream.") while True: ret, frame = cap.read() if not ret: print("Error read video stream.") break result = model.track(frame,show=False,stream=False,persist=True,device=int(id)%4) result = result[0] detections = sv.Detections.from_yolov8(result) if result.boxes.id is not None: detections.tracker_id = result.boxes.id.cpu().numpy().astype(int) detections = detections[(detections.tracker_id != None)] for xyxy,_, confidence, class_id, tracker_id in detections: balls[model.model.names[class_id]] = { "tkid": int(tracker_id), "conf": round(float(confidence), 2), "xyxy": [int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3]) ] } if timebegin > int(time.time()*1000): continue json_data = json.dumps({"table":id,"balls":balls,"time":timebegin}) timebegin += region balls = {} if not stream: yield json_data else: yield f"data: {json_data}\n\n" except GeneratorExit: print("Client disconnected at", time.ctime()) finally:cap.release() def workerloop(stop_event,id,source,target=None,region=None): try: gen = worker(id,source,region) for data in gen: if stop_event.is_set(): break json_data = json.loads(data) # 设置请求头 headers = { "Content-Type": "application/json" } text = json.dumps({"content":json_data}) print(text) response = requests.post(target, data=text, headers=headers) print(response.text) finally: gen.close() if __name__ == '__main__': app.run("0.0.0.0",threaded=True)