import cv2 import json import time import torch import requests import threading import numpy as np import supervision as sv from datetime import datetime from ultralytics import YOLO from flask import Flask,request,Response from multiprocessing import Process, Event app = Flask(__name__) workers = {} @app.route('/start/') def start(id): table = workers.get(id) if table is not None: table.set() source = request.args.get('source') target = request.args.get('target') region = request.args.get('region') if target is None: return "please input target url" return Response(worker(id,source,region,True), mimetype='text/event-stream') stop_event = Event() p = Process(target=workerloop, args=(stop_event,id,source,target,region)) p.start() workers[id] = stop_event return 'ok' @app.route('/stop/') def stop(id): table = workers.get(id) if table is not None: table.set() return 'ok' @app.route('/show',methods=['POST']) def show(): # print(request.json) data = request.json["content"] info(data) return 'ok' def info(data): balls = data["balls"] table = data["table"] text = f"table:{table:>3} " for i in range(16): ball = "#" if "ball"+str(i) in balls else " " text += f"| {i} : {ball} " print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),text) def worker(id,source,region=None,stream=False): if region == None: region = 10 else: region = int(region) print("start loading model...",id,source,region) model = YOLO('./best.pt') print("start loaded model!!!") balls = {} count = 0 delay = 0 bgn = 0 try: cap = cv2.VideoCapture(source) if not cap.isOpened(): print("Error opening video stream.") if source.endswith(".mp4"): fps = cap.get(cv2.CAP_PROP_FPS) delay = 1000.0 / fps print("start video stream... ", fps, delay) while True: use = datetime.now().strftime('%Y-%m-%d %H:%M:%S') if bgn != 0 and delay != 0 and time.time()*1000 - bgn < 2*delay: print("sleep",2*delay - (time.time()*1000 - bgn)) time.sleep((2*delay - (time.time()*1000 - bgn))/1000) bgn = (time.time()*1000) ret, frame = cap.read() del(ret) del(frame) ret, frame = cap.read() use += " read:"+str((time.time()*1000) - bgn) if not ret: print("Error read video stream.") cap = cv2.VideoCapture(source) if not cap.isOpened(): print("Error opening video stream..") cap = cv2.VideoCapture(source) time.sleep(1) if not cap.isOpened(): print("Error opening video stream...") time.sleep(3) if not cap.isOpened(): print("Error opening video stream....") break continue # result = model.track(frame,device=int(id)%4,tracker='botsort.yaml') result = model(frame) use += " track:"+str((time.time()*1000) - bgn) del(ret) del(frame) result = result[0] detections = sv.Detections.from_yolov8(result) if result.boxes.id is not None: detections.tracker_id = result.boxes.id.cpu().numpy().astype(int) else: detections.tracker_id = np.array([]) detections.conf = np.array([]) detections.xyxy=np.empty((0, 4), dtype=np.float32) # detections = detections[(detections.tracker_id != None)] count += 1 names = {} for xyxy,_, confidence, class_id, tracker_id in detections: name = model.model.names[class_id] if name in names: print("Name duplicate",name) continue ball = balls.get(name,{}) ball["tkid"] = int(tracker_id) ball["conf"] = round(float(confidence), 2) ball["xyxy"] = [int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])] ball["count"] = ball.get("count",0) + 1 if region - count < 3: ball["final"] = ball.get("final",0) + 1 balls[name] = ball names[name] = True names = {} use += " names:"+str((time.time()*1000) - bgn) if count < region: continue data = {"table":id,"balls":balls,"time":int(time.time()*1000)} info(data) json_data = json.dumps(data) balls = {} count = 0 use += " dump:"+str((time.time()*1000) - bgn) if not stream: yield json_data else: yield f"data: {json_data}\n\n" use += " yield:"+str((time.time()*1000) - bgn) print("model.track",use) except GeneratorExit: print("Client disconnected at", time.ctime()) finally:cap.release() def workerloop(stop_event,id,source,target=None,region=None): try: gen = worker(id,source,region) for data in gen: use = datetime.now().strftime('%Y-%m-%d %H:%M:%S') bgn = (time.time()*1000) if stop_event.is_set(): break json_data = json.loads(data) use += " loads:"+str((time.time()*1000) - bgn) text = json.dumps({"content":json_data}) use += " dumps:"+str((time.time()*1000) - bgn) # 创建并启动线程 thread = threading.Thread(target=post_request, args=(target, text)) thread.start() use += " post:"+str((time.time()*1000) - bgn) print("workerloop",use) finally: gen.close() def post_request(url, data): use = datetime.now().strftime('%Y-%m-%d %H:%M:%S') bgn = (time.time()*1000) headers = { "Content-Type": "application/json" } response = requests.post(url, data=data,headers=headers) use += " post:"+str((time.time()*1000) - bgn ) +"ms" print("post_request",use,response.text) # import pstats # import cProfile # def test_function(): # url = "rtmp://rtmp03open.ys7.com:1935/v3/openlive/L05874022_1_1?expire=1726389273&id=625006970167889920&t=2c7f371063e4ad672cb7a7c34b2a236e9ff37b02052f818c9c4fe7ae9632cd0e&ev=100" # gen = worker(0,url,5) # count = 0 # for data in gen: # count += 1 # if count > 10: # break if __name__ == '__main__': # cProfile.run('test_function()', 'test_function.profile') # p = pstats.Stats('test_function.profile') # p.sort_stats('cumulative').print_stats(100) # Top 10 by cumulative time app.run("0.0.0.0",threaded=True)