import cv2 import json import time import requests import supervision as sv from datetime import datetime from ultralytics import YOLO from flask import Flask,request,Response from multiprocessing import Process, Event import torch app = Flask(__name__) workers = {} @app.route('/start/') def start(id): table = workers.get(id) if table is not None: table.set() source = request.args.get('source') target = request.args.get('target') region = request.args.get('region') if target is None: return "please input target url" return Response(worker(id,source,region,True), mimetype='text/event-stream') stop_event = Event() p = Process(target=workerloop, args=(stop_event,id,source,target,region)) p.start() workers[id] = stop_event return 'ok' @app.route('/stop/') def stop(id): table = workers.get(id) if table is not None: table.set() return 'ok' @app.route('/show',methods=['POST']) def show(): # print(request.json) data = request.json["content"] balls = data["balls"] table = data["table"] text = f"table:{table:>3} " for i in range(16): ball = "#" if "ball"+str(i) in balls else " " text += f"| {i} : {ball} " print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),text) return 'ok' def worker(id,source,region=None,stream=False): if region == None: region = 10 else: region = int(region) model = YOLO('./best.pt') balls = {} count = 0 device_count = torch.cuda.device_count() print(device_count,source) try: cap = cv2.VideoCapture(source) if not cap.isOpened(): print("Error opening video stream.") while True: ret, frame = cap.read() if not ret: print("Error read video stream.") break result = model.track(frame,show=False,stream=False,persist=True,device=int(id)%device_count) result = result[0] detections = sv.Detections.from_yolov8(result) if result.boxes.id is not None: detections.tracker_id = result.boxes.id.cpu().numpy().astype(int) detections = detections[(detections.tracker_id != None)] count += 1 names = {} for xyxy,_, confidence, class_id, tracker_id in detections: name = model.model.names[class_id] if name in names: print("Name duplicate",name) continue ball = balls.get(name,{}) ball["tkid"] = int(tracker_id) ball["conf"] = round(float(confidence), 2) ball["xyxy"] = [int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])] ball["count"] = ball.get("count",0) + 1 if region - count < 3: ball["final"] = ball.get("final",0) + 1 balls[name] = ball names[name] = True names = {} if count < region: continue json_data = json.dumps({"table":id,"balls":balls,"time":int(time.time()*1000)}) balls = {} count = 0 if not stream: yield json_data else: yield f"data: {json_data}\n\n" except GeneratorExit: print("Client disconnected at", time.ctime()) finally:cap.release() def workerloop(stop_event,id,source,target=None,region=None): try: gen = worker(id,source,region) for data in gen: if stop_event.is_set(): break json_data = json.loads(data) # 设置请求头 headers = { "Content-Type": "application/json" } text = json.dumps({"content":json_data}) print(text) response = requests.post(target, data=text, headers=headers) print(response.text) finally: gen.close() if __name__ == '__main__': app.run("0.0.0.0",threaded=True)