کد زیر کاملا درست کار میکند من میخواهم هر ۵۰ میلی ...

创建于:2025年3月17日

使用 O1 回答 Chat01

问题

کد زیر کاملا درست کار میکند
من میخواهم هر ۵۰ میلی ثانیه یکبار فریم را نمایش بدم در صورتی که در صف tracker چیزی بود آن را با فریم نمایش بدم و اگر هم نبود فقط فریم خالی را نمایش بدهم

کد نهایی را کامل و بدون ارور بنویس و از deque استفاده کن
یادت باشه دقیق سر هر ۵۰ میلی ثانیه نمایش بده و اگر tracker صف داده داشت اون رو کپی کنه و از اون استفاده کنه

کد:

import cv2
import numpy as np
import time
import warnings
import logging
import os
from collections import deque
import threading

import tensorrt as trt
import pycuda.driver as cuda

توجه داشته باشید از import pycuda.autoinit استفاده نشده

چون ممکن است با کانتکست‌های سفارشی تداخل کند.

from deep_sort_realtime.deepsort_tracker import DeepSort

حذف اخطارهای مربوط به np.bool در نسخه‌های جدید NumPy

np.bool = np.bool_
warnings.filterwarnings("ignore", category=FutureWarning, message=".np.bool.")

########################################

تنظیمات اولیه logging

########################################
logging.basicConfig(
level=logging.INFO, # سطح ثبت لاگ‌ها
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)

########################################

کلاس‌های BaseEmbedder + TRTReIDEmbedder

########################################
class BaseEmbedder:
def init(self, embedding_size=128):
self.embedding_size = embedding_size

text
def forward(self, x): raise NotImplementedError("Subclasses must implement this method.")

class TRTReIDEmbedder(BaseEmbedder):
def init(self, engine_path, embedding_size=128):
super().init(embedding_size=embedding_size)
self.logger = trt.Logger(trt.Logger.WARNING)
self.runtime = trt.Runtime(self.logger)
self.engine = self._load_engine(engine_path)
self.context = self.engine.create_execution_context()
self.inputs, self.outputs, self.bindings = self._allocate_buffers()

text
def _load_engine(self, engine_path): with open(engine_path, "rb") as f: engine_data = f.read() return self.runtime.deserialize_cuda_engine(engine_data) def _allocate_buffers(self): inputs, outputs, bindings = [], [], [] for i in range(self.engine.num_bindings): name = self.engine.get_tensor_name(i) shape = self.engine.get_tensor_shape(name) dtype = self.engine.get_tensor_dtype(name) size = trt.volume(shape) np_type = trt.nptype(dtype) buf = cuda.mem_alloc(size * np_type().itemsize) bindings.append(buf) mode = self.engine.get_tensor_mode(name) if mode == trt.TensorIOMode.INPUT: inputs.append(buf) else: outputs.append(buf) return inputs, outputs, bindings def preprocess_single(self, image): # اگر کانال رنگی اشتباه باشد، اصلاح می‌کنیم if image.shape[0] == 3 and len(image.shape) == 3: image = image.transpose(1, 2, 0) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.resize(image, (224, 224)) image = image.astype(np.float32) / 255.0 mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) std = np.array([0.229, 0.224, 0.225], dtype=np.float32) image = (image - mean) / std image = image.transpose(2, 0, 1) image = np.expand_dims(image, axis=0) return np.ascontiguousarray(image) def forward(self, np_image_crops): if len(np_image_crops) == 0: return np.empty((0, self.embedding_size), dtype=np.float32) outputs = [] for crop in np_image_crops: processed = self.preprocess_single(crop) cuda.memcpy_htod(self.inputs[0], processed.ravel()) self.context.execute_v2(self.bindings) # در صورت نیاز به همگام‌سازی با GPU cuda.Context.synchronize() output_name = self.engine.get_tensor_name(1) output_shape = self.engine.get_tensor_shape(output_name) output = np.empty(output_shape, dtype=np.float32) cuda.memcpy_dtoh(output, self.outputs[0]) outputs.append(output) return np.concatenate(outputs, axis=0)

########################################

YOLO TensorRT Inference Class

########################################
class MFNET:
def init(self, engine_path):
self.logger = trt.Logger(trt.Logger.WARNING)
self.runtime = trt.Runtime(self.logger)
self.engine = self._load_engine(engine_path)
self.context = self.engine.create_execution_context()
self.inputs, self.outputs, self.bindings = self._allocate_buffers()

text
def _load_engine(self, engine_path): with open(engine_path, "rb") as f: engine_data = f.read() return self.runtime.deserialize_cuda_engine(engine_data) def _allocate_buffers(self): inputs, outputs, bindings = [], [], [] for i in range(self.engine.num_bindings): name = self.engine.get_tensor_name(i) shape = self.engine.get_tensor_shape(name) dtype = self.engine.get_tensor_dtype(name) size = trt.volume(shape) np_dtype = trt.nptype(dtype) buffer = cuda.mem_alloc(size * np_dtype().itemsize) bindings.append(int(buffer)) io_mode = self.engine.get_tensor_mode(name) if io_mode == trt.TensorIOMode.INPUT: inputs.append(buffer) else: outputs.append(buffer) return inputs, outputs, bindings def infer(self, input_batch): cuda.memcpy_htod(self.inputs[0], input_batch) self.context.execute_v2(self.bindings) cuda.Context.synchronize() # فرض می‌کنیم فقط یک خروجی با ایندکس 1 داریم output_name = self.engine.get_tensor_name(1) output_shape = self.engine.get_tensor_shape(output_name) output = np.empty(trt.volume(output_shape), dtype=np.float32) cuda.memcpy_dtoh(output, self.outputs[0]) return output.reshape(-1, 10)

########################################

NMS / Postprocessing

########################################
def nms(boxes, scores, iou_threshold):
x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
areas = (x2 - x1) * (y2 - y1)
order = np.argsort(scores)[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
overlap = (w * h) / (areas[i] + areas[order[1:]] - w * h)
order = order[np.where(overlap <= iou_threshold)[0] + 1]
return np.array(keep)

def fast_postprocess(output, conf_threshold=0.1, iou_threshold=0.4):
if output.shape[0] == 0:
return np.empty((0, 4)), np.array([]), np.array([])

text
boxes, conf_scores, class_scores = output[:, :4], output[:, 4], output[:, 5:] mask = conf_scores >= conf_threshold boxes, scores, class_confidences = boxes[mask], conf_scores[mask], class_scores[mask] if len(boxes) == 0: return np.empty((0, 4)), np.array([]), np.array([]) max_class_conf = np.max(class_confidences, axis=1) mask2 = max_class_conf >= conf_threshold boxes, scores, class_confidences = boxes[mask2], scores[mask2], class_confidences[mask2] if len(boxes) == 0: return np.empty((0, 4)), np.array([]), np.array([]) x_center, y_center, width, height = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] x_min = x_center - width / 2 y_min = y_center - height / 2 x_max = x_center + width / 2 y_max = y_center + height / 2 boxes_xyxy = np.stack([x_min, y_min, x_max, y_max], axis=1) inds = nms(boxes_xyxy, scores, iou_threshold) boxes_final = boxes_xyxy[inds] scores_final = scores[inds] class_ids_final = np.argmax(class_confidences[inds], axis=1) return boxes_final, scores_final, class_ids_final

########################################

تابع کمکی تغییر سایز و آماده‌سازی ورودی

########################################
def letterbox_image(image, target_size):
ih, iw = image.shape[:2]
h, w = target_size
scale = min(w / iw, h / ih)
new_w, new_h = int(iw * scale), int(ih * scale)
resized_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
pad_w, pad_h = (w - new_w) // 2, (h - new_h) // 2
padded_image = np.full((h, w, 3), 128, dtype=np.uint8)
padded_image[pad_h:pad_h + new_h, pad_w:pad_w + new_w] = resized_image
return padded_image, scale, pad_w, pad_h

def preprocess_image(image, input_size=(640, 640)):
img, scale, pad_w, pad_h = letterbox_image(image, input_size)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)
img = np.expand_dims(img, axis=0)
return np.ascontiguousarray(img), scale, pad_w, pad_h

########################################

رشته‌ی نمایش + خواندن ویدیو (وظیفه‌ی شماره 1 خواسته‌شده)

########################################
def display_thread_func(video_path, frame_queue, tracking_queue ,output_folder='output_frames'):
"""
در این تابع:
1. ویدیو باز می‌شود و فریم‌ها یکی‌یکی خوانده می‌شوند.
2. هر 50 میلی‌ثانیه یکبار، فریم فعلی در صف frame_queue قرار می‌گیرد (برای دیتکشن).
3. به صورت غیرمسدود، نتایج ترک را از tracking_queue برداشته و اگر نتیجه‌ جدیدی وجود داشت،
باکس‌ها را روی همان فریم می‌کشد.
4. فریم را (چه با باکس، چه بدون باکس) نمایش می‌دهد.
5. اگر به انتهای ویدیو رسیدیم، None در صف frame_queue گذاشته و خارج می‌شویم.
6. برای جلوگیری از انسداد، از get_nowait استفاده می‌شود و اگر چیزی نبود، صبر نمی‌کنیم.
"""
cap = cv2.VideoCapture(video_path)
frame_id = 0
last_tracking_data = None # آخرین نتیجه‌ی ترک که دریافت کردیم

text
# برای کشیدن مسیر فریم‌های اخیر history_tracks = deque(maxlen=20) while True: ret, frame = cap.read() if not ret: # اتمام ویدیو frame_queue.put(None) # سیگنال پایان به رشته‌ی دیتکشن break # print("frame_id : ", frame_id) frame_queue.append((frame_id, frame)) # اضافه کردن به انتهای صف if frame_queue: # بررسی اینکه صف خالی نباشد print("************************************************************************************************") last_tracking_data = frame_queue[0] # دسترسی به اولین عنصر بدون حذف آن frame_id, frame = last_tracking_data print("frame_id : ", frame_id) else: continue if tracking_queue: print("----------------------------------------------------------------------------------") out = tracking_queue[0] frame_id, frame, detection_list, track_objects = out # print("last_tracking_data") # کشیدن باکس‌های دیتکشن (اختیاری) for (x1, y1, x2, y2, score, cls_id) in detection_list: dx1, dy1, dw, dh = x1, y1, x2, y2 # dx2 = dx1 + dw # dy2 = dy1 + dh cv2.rectangle(frame, (dx1, dy1), (dw, dh), (100, 0, 255), 2) for bbox in track_objects: tid, x1, y1, x2, y2 = bbox cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 255, 0), 2) cv2.putText(frame, f"Track ID: {tid}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2, cv2.LINE_AA) # آپدیت history_tracks history_tracks.append([frame_id, track_objects]) # کشیدن مرکز ترک در فریم‌های اخیر (مسیر حرکت) for older_frame_id, objects in history_tracks: for obj in objects: _, x1, y1, x2, y2 = obj center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 cv2.circle(frame, (center_x, center_y), radius=3, color=(200, 100, 0), thickness=-1) # ذخیره فریم در خروجی out_path = os.path.join(output_folder, f"frame_{frame_id}.jpg") cv2.imwrite(out_path, frame) # تلاش برای خواندن همه‌ی نتایج ترک موجود در صف، تا آخرین نتیجه را به‌روز کنیم # while True: # # if frame_queue: # بررسی اینکه صف خالی نباشد # print("************************************************************************************************") # # last_tracking_data = frame_queue[0] # دسترسی به اولین عنصر بدون حذف آن # frame , frame_id = last_tracking_data # else: # continue # # print("tracking_queue : ", len(tracking_queue)) # # if tracking_queue: # print("----------------------------------------------------------------------------------") # # # out = tracking_queue[0] # # frame_id ,frame ,detection_list ,track_objects = out # # print("last_tracking_data") # # # کشیدن باکس‌های دیتکشن (اختیاری) # for (x1, y1, x2, y2, score, cls_id) in detection_list: # dx1, dy1, dw, dh = x1, y1, x2, y2 # # dx2 = dx1 + dw # # dy2 = dy1 + dh # cv2.rectangle(frame, (dx1, dy1), (dw, dh), (100, 0, 255), 2) # # for bbox in track_objects: # tid, x1, y1, x2, y2 = bbox # cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 255, 0), 2) # cv2.putText(frame, f"Track ID: {tid}", (x1, y1 - 10), # cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), # 2, cv2.LINE_AA) # # # آپدیت history_tracks # history_tracks.append([frame_id, track_objects]) # # کشیدن مرکز ترک در فریم‌های اخیر (مسیر حرکت) # for older_frame_id, objects in history_tracks: # for obj in objects: # _, x1, y1, x2, y2 = obj # center_x = (x1 + x2) // 2 # center_y = (y1 + y2) // 2 # cv2.circle(frame, (center_x, center_y), radius=3, # color=(200, 100, 0), thickness=-1) # # # # ذخیره فریم در خروجی # out_path = os.path.join(output_folder, f"frame_{frame_id}.jpg") # cv2.imwrite(out_path, frame) # # else: # continue # نمایش فریم # print("نمایشششششششششششششششششششششششششششششششششششششششششششششششششششششششششششششششششش") cv2.imshow("Display", frame) frame_id += 1 # هر 50 میلی‌ثانیه یکبار بروزرسانی نمایش if cv2.waitKey(50) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() logging.info("[Display Thread] Finished Displaying & Reading Video.")

########################################

رشته‌ی دیتکشن (وظیفه‌ی شماره 2 خواسته‌شده)

########################################
def detection_thread_func(frame_queue, detection_queue, yolo_engine_path):
"""
این تابع فریم‌ها را از frame_queue برمی‌دارد و روی آنها دیتکشن می‌زند.
اگر صف خالی بود، منتظر نمی‌ماند (به کمک get_nowait) و ادامه می‌دهد.
نتیجه‌ی دیتکشن را در detection_queue می‌گذارد.
"""
cuda.init()
device = cuda.Device(0)
ctx = device.make_context()

text
try: # بارگذاری مدل YOLO yolo_model = MFNET(yolo_engine_path) while True: print("detection") # تلاش برای برداشتن فریم از صف به صورت غیرمسدود # try: # item = frame_queue.get_nowait() # except queue.Empty: # # اگر خالی است، کمی صبر کن و ادامه بده # # time.sleep(0.01) # continue if frame_queue: # بررسی اینکه صف خالی نباشد item = frame_queue[0] # دسترسی به اولین عنصر بدون حذف آن # استفاده از item بدون حذف از صف else: continue if item is None: # یعنی دیگر فریمی نداریم، پایان detection_queue.put(None) break frame_id, frame = item t0 = time.time() # انجام دیتکشن input_batch, scale, pad_w, pad_h = preprocess_image(frame) detections = yolo_model.infer(input_batch) final_boxes, final_scores, final_class_ids = fast_postprocess( detections, conf_threshold=0.1, iou_threshold=0.4 ) detection_time = (time.time() - t0) * 1000.0 # تبدیل به فرمت قابل استفاده در DeepSort detection_list = [] for box, score, cls_id in zip(final_boxes, final_scores, final_class_ids): x1, y1, x2, y2 = (box / scale).astype(int) x1, x2 = x1 - pad_w, x2 - pad_w y1, y2 = y1 - pad_h, y2 - pad_h detection_list.append((x1, y1, x2, y2, score, cls_id)) # قرار دادن در صف detection_queue.append((frame_id, frame, detection_list, detection_time)) # print("frame_id : ", frame_id , "frame : ", frame, "detection_list : ", detection_list) finally: ctx.pop() logging.info("[Detection Thread] Finished & CUDA context popped.")

########################################

رشته‌ی ترکینگ (وظیفه‌ی شماره 3 خواسته‌شده)

########################################
def tracker_thread_func(detection_queue, tracking_queue, reid_engine_path):
"""
این تابع نتیجه‌ی دیتکشن را از detection_queue برداشته و ترکینگ انجام می‌دهد.
سپس نتیجه را در tracking_queue می‌گذارد.
اگر detection_queue خالی بود، منتظر نمی‌ماند و ادامه می‌دهد.
"""
cuda.init()
device = cuda.Device(0)
ctx = device.make_context()

text
try: # بارگذاری مدل ReID trt_reid_embedder = TRTReIDEmbedder( engine_path=reid_engine_path, embedding_size=128 ) trt_reid_embedder.predict = trt_reid_embedder.forward # ساخت DeepSort tracker = DeepSort( max_age=50, n_init=3, nms_max_overlap=1.0, embedder=None, embedder_gpu=True, nn_budget=100, half=False ) tracker.embedder = trt_reid_embedder track_total_list = [] while True: print("tracker") # try: # item = detection_queue.get_nowait() # except queue.Empty: # # اگر خالی بود، کمی صبر کنیم و رد شویم # time.sleep(0.01) # continue if detection_queue: # بررسی اینکه صف خالی نباشد item = detection_queue[0] # دسترسی به اولین عنصر بدون حذف آن # استفاده از item بدون حذف از صف else: continue if item is None: # سیگنال پایان tracking_queue.put(None) break frame_id, frame, detection_list , detection_time = item # تبدیل فرمت دیتکشن برای DeepSort deepsort_detections = [] for (x1, y1, x2, y2, score, cls_id) in detection_list: w, h = x2 - x1, y2 - y1 deepsort_detections.append(([x1, y1, w, h], score, cls_id, frame_id)) t0 = time.time() # ترکینگ tracks = tracker.update_tracks(deepsort_detections, frame=frame) track_time = (time.time() - t0) * 1000.0 track_total_list.append(track_time) # استخراج باکس‌های نهایی ترک track_objects = [] frame_h, frame_w = frame.shape[:2] for t in tracks: if not t.is_confirmed(): continue track_id = t.track_id ltrb = t.to_ltrb() bx1 = max(0, int(ltrb[0])) by1 = max(0, int(ltrb[1])) bx2 = min(frame_w, int(ltrb[2])) by2 = min(frame_h, int(ltrb[3])) track_objects.append([track_id, bx1, by1, bx2, by2]) # # قرار دادن در صف نمایش # tracking_queue.append({ # 'frame_id': frame_id, # 'frame': frame, # 'detection_list': detection_list, # 'track_objects': bboxes, # 'detection_time': detection_time, # 'track_time': track_time # }) # قرار دادن در صف نمایش tracking_queue.append((frame_id, frame, detection_list, track_objects)) # print("tracking_queue : ", tracking_queue) if len(track_total_list) > 0: mean_track_time = np.mean(track_total_list) std_dev = np.std(track_total_list) logging.info(f"[Tracker Thread] Mean Track Time: {mean_track_time:.2f} ms") logging.info(f"[Tracker Thread] Std Dev Track Time: {std_dev:.2f} ms") finally: ctx.pop() logging.info("[Tracker Thread] Finished & CUDA context popped.")

########################################

تابع اصلی

########################################
def main():
# مسیرها
video_path = "/home/xaviernx/Videos/final_640.mkv"
yolo_engine_path = "/home/xaviernx/my files/nx/my files/model/640x640/mfnet.trt"
reid_engine_path = "/home/xaviernx/Documents/resnet18.trt"

text
# ایجاد صف‌ها frame_queue = deque(maxlen=1) detection_queue = deque(maxlen=1) tracking_queue = deque(maxlen=1) # ساخت Threadها detection_thread = threading.Thread( target=detection_thread_func, args=(frame_queue, detection_queue, yolo_engine_path), name="DetectionThread" ) tracker_thread = threading.Thread( target=tracker_thread_func, args=(detection_queue, tracking_queue, reid_engine_path), name="TrackerThread" ) display_thread = threading.Thread( target=display_thread_func, args=(video_path, frame_queue, tracking_queue), name="DisplayThread" ) # شروع Threadها detection_thread.start() tracker_thread.start() display_thread.start() # منتظر ماندن تا همه تمام شوند detection_thread.join() tracker_thread.join() display_thread.join() logging.info("[Main] All threads finished.")

if name == "main":
main()

思考

Managing video frames

The display_thread_func efficiently reads video frames, overlays tracker data, displays, and stores them every 50 ms using time.sleep or cv2.waitKey.

Refining the solution

Ensuring thread-safety, the user aims to display a frame every 50 ms, leveraging a queue or deque to manage the tracker data effectively.

Ensuring non-blocking flow

Focusing on non-blocking frame reading and displaying every 50 ms, while integrating tracking data if available, to achieve a seamless and efficient video display.

Enhancing logic

I’m working to efficiently display frames every 50 ms, ensuring thread-safety with a deque, and integrating tracking data if available. This approach leads me to a more streamlined implementation.

Enhancing thread-safety

I'm thinking through managing tracking data and avoiding deque overflow by consistently removing entries, ensuring smooth, thread-safe video frame processing.

Correcting logic

OK, let me see. Ensuring detection_queue entries are removed after processing prevents redundant frame revisit by each thread, leading to efficient and accurate tracking.

Crafting the solution

I’m optimizing queue management for efficient video frame tracking, incorporating consistent 50 ms intervals, initializing device contexts, and refining thread logic for smoother performance.

Tweaking code logic

I’m addressing queue management issues by tweaking the logic to remove items properly and incorporating a 50 ms display interval for smoother video playback.

Streamlining queue signals

OK, so the final frames will be stored, and specific signals will indicate when it's time for detection, tracking, and display threads to cease operations.

Shifting the strategy

I’m streamlining the detection and tracking threads to read frames from queues non-blocking, ensuring efficiency without lingering references, and omitting redundant code segments.

Reasoned for 41 seconds

回答

در کد زیر، منطق صف‌ها (Queue/Deque) به‌صورت درست پیاده‌سازی شده است تا هر بار دادهٔ جدیدی که از مرحلهٔ قبلی تولید می‌شود، فقط یک بار مصرف شود (از پُشت صف اضافه و از جلوی صف برداشته می‌شود). همچنین در رشتهٔ نمایش (Display)، دقیقاً هر ۵۰ میلی‌ثانیه یک فریم را نمایش می‌دهیم. اگر در صف ترکینگ (tracking_queue) خروجی‌ای وجود داشت، آن را با فریم می‌کشیم و اگر نبود، همان فریم خام را نشان می‌دهیم.

نکته: در این کد از سه صف (deque) استفاده شده است:

  • frame_queue: اشتراک فریم خام بین Display و Detection
  • detection_queue: اشتراک نتایج دیتکشن بین Detection و Tracker
  • tracking_queue: اشتراک نتایج ترکینگ بین Tracker و Display

همچنین دقت کنید که برای پایان کار هر رشته (Thread)، از گذاشتن None در صف مربوطه استفاده شده است و هر رشته بعد از دیدن None، کار خود را خاتمه می‌دهد.

python
import cv2 import numpy as np import time import warnings import logging import os import threading import tensorrt as trt import pycuda.driver as cuda # توجه داشته باشید از import pycuda.autoinit استفاده نشده # چون ممکن است با کانتکست‌های سفارشی تداخل کند. from collections import deque from deep_sort_realtime.deepsort_tracker import DeepSort # حذف اخطارهای مربوط به np.bool در نسخه‌های جدید NumPy np.bool = np.bool_ warnings.filterwarnings("ignore", category=FutureWarning, message=".*np.bool.*") ######################################## # تنظیمات اولیه logging ######################################## logging.basicConfig( level=logging.INFO, # سطح ثبت لاگ‌ها format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) ######################################## # کلاس‌های BaseEmbedder + TRTReIDEmbedder ######################################## class BaseEmbedder: def __init__(self, embedding_size=128): self.embedding_size = embedding_size def forward(self, x): raise NotImplementedError("Subclasses must implement this method.") class TRTReIDEmbedder(BaseEmbedder): def __init__(self, engine_path, embedding_size=128): super().__init__(embedding_size=embedding_size) self.logger = trt.Logger(trt.Logger.WARNING) self.runtime = trt.Runtime(self.logger) self.engine = self._load_engine(engine_path) self.context = self.engine.create_execution_context() self.inputs, self.outputs, self.bindings = self._allocate_buffers() def _load_engine(self, engine_path): with open(engine_path, "rb") as f: engine_data = f.read() return self.runtime.deserialize_cuda_engine(engine_data) def _allocate_buffers(self): inputs, outputs, bindings = [], [], [] for i in range(self.engine.num_bindings): name = self.engine.get_tensor_name(i) shape = self.engine.get_tensor_shape(name) dtype = self.engine.get_tensor_dtype(name) size = trt.volume(shape) np_type = trt.nptype(dtype) buf = cuda.mem_alloc(size * np_type().itemsize) bindings.append(buf) mode = self.engine.get_tensor_mode(name) if mode == trt.TensorIOMode.INPUT: inputs.append(buf) else: outputs.append(buf) return inputs, outputs, bindings def preprocess_single(self, image): # اگر کانال رنگی اشتباه باشد، اصلاح می‌کنیم if image.shape[0] == 3 and len(image.shape) == 3: image = image.transpose(1, 2, 0) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.resize(image, (224, 224)) image = image.astype(np.float32) / 255.0 mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) std = np.array([0.229, 0.224, 0.225], dtype=np.float32) image = (image - mean) / std image = image.transpose(2, 0, 1) image = np.expand_dims(image, axis=0) return np.ascontiguousarray(image) def forward(self, np_image_crops): if len(np_image_crops) == 0: return np.empty((0, self.embedding_size), dtype=np.float32) outputs = [] for crop in np_image_crops: processed = self.preprocess_single(crop) cuda.memcpy_htod(self.inputs[0], processed.ravel()) self.context.execute_v2(self.bindings) cuda.Context.synchronize() output_name = self.engine.get_tensor_name(1) output_shape = self.engine.get_tensor_shape(output_name) output = np.empty(output_shape, dtype=np.float32) cuda.memcpy_dtoh(output, self.outputs[0]) outputs.append(output) return np.concatenate(outputs, axis=0) ######################################## # YOLO TensorRT Inference Class ######################################## class MFNET: def __init__(self, engine_path): self.logger = trt.Logger(trt.Logger.WARNING) self.runtime = trt.Runtime(self.logger) self.engine = self._load_engine(engine_path) self.context = self.engine.create_execution_context() self.inputs, self.outputs, self.bindings = self._allocate_buffers() def _load_engine(self, engine_path): with open(engine_path, "rb") as f: engine_data = f.read() return self.runtime.deserialize_cuda_engine(engine_data) def _allocate_buffers(self): inputs, outputs, bindings = [], [], [] for i in range(self.engine.num_bindings): name = self.engine.get_tensor_name(i) shape = self.engine.get_tensor_shape(name) dtype = self.engine.get_tensor_dtype(name) size = trt.volume(shape) np_dtype = trt.nptype(dtype) buffer = cuda.mem_alloc(size * np_dtype().itemsize) bindings.append(int(buffer)) io_mode = self.engine.get_tensor_mode(name) if io_mode == trt.TensorIOMode.INPUT: inputs.append(buffer) else: outputs.append(buffer) return inputs, outputs, bindings def infer(self, input_batch): cuda.memcpy_htod(self.inputs[0], input_batch) self.context.execute_v2(self.bindings) cuda.Context.synchronize() # فرض می‌کنیم فقط یک خروجی با ایندکس 1 داریم output_name = self.engine.get_tensor_name(1) output_shape = self.engine.get_tensor_shape(output_name) output = np.empty(trt.volume(output_shape), dtype=np.float32) cuda.memcpy_dtoh(output, self.outputs[0]) return output.reshape(-1, 10) ######################################## # NMS / Postprocessing ######################################## def nms(boxes, scores, iou_threshold): x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] areas = (x2 - x1) * (y2 - y1) order = np.argsort(scores)[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1) h = np.maximum(0.0, yy2 - yy1) overlap = (w * h) / (areas[i] + areas[order[1:]] - w * h) order = order[np.where(overlap <= iou_threshold)[0] + 1] return np.array(keep) def fast_postprocess(output, conf_threshold=0.1, iou_threshold=0.4): if output.shape[0] == 0: return np.empty((0, 4)), np.array([]), np.array([]) boxes, conf_scores, class_scores = output[:, :4], output[:, 4], output[:, 5:] mask = conf_scores >= conf_threshold boxes, scores, class_confidences = boxes[mask], conf_scores[mask], class_scores[mask] if len(boxes) == 0: return np.empty((0, 4)), np.array([]), np.array([]) max_class_conf = np.max(class_confidences, axis=1) mask2 = max_class_conf >= conf_threshold boxes, scores, class_confidences = boxes[mask2], scores[mask2], class_confidences[mask2] if len(boxes) == 0: return np.empty((0, 4)), np.array([]), np.array([]) x_center, y_center, width, height = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] x_min = x_center - width / 2 y_min = y_center - height / 2 x_max = x_center + width / 2 y_max = y_center + height / 2 boxes_xyxy = np.stack([x_min, y_min, x_max, y_max], axis=1) inds = nms(boxes_xyxy, scores, iou_threshold) boxes_final = boxes_xyxy[inds] scores_final = scores[inds] class_ids_final = np.argmax(class_confidences[inds], axis=1) return boxes_final, scores_final, class_ids_final ######################################## # تابع کمکی تغییر سایز و آماده‌سازی ورودی ######################################## def letterbox_image(image, target_size): ih, iw = image.shape[:2] h, w = target_size scale = min(w / iw, h / ih) new_w, new_h = int(iw * scale), int(ih * scale) resized_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR) pad_w, pad_h = (w - new_w) // 2, (h - new_h) // 2 padded_image = np.full((h, w, 3), 128, dtype=np.uint8) padded_image[pad_h:pad_h + new_h, pad_w:pad_w + new_w] = resized_image return padded_image, scale, pad_w, pad_h def preprocess_image(image, input_size=(640, 640)): img, scale, pad_w, pad_h = letterbox_image(image, input_size) img = img.astype(np.float32) / 255.0 img = img.transpose(2, 0, 1) img = np.expand_dims(img, axis=0) return np.ascontiguousarray(img), scale, pad_w, pad_h ######################################## # رشته‌ی نمایش (Display) ######################################## def display_thread_func(video_path, frame_queue, tracking_queue, output_folder='output_frames'): """ 1. ویدیو باز می‌شود و فریم‌ها یکی‌یکی خوانده می‌شوند. 2. هر بار فریم در صف frame_queue قرار می‌گیرد. 3. دقیقاً هر 50 میلی‌ثانیه یکبار، فریم جاری را نمایش می‌دهیم. اگر در صف tracking_queue دیتای جدیدی موجود بود، آن را روی همین فریم اعمال می‌کنیم. اگر نبود، همان فریم خام را نشان می‌دهیم. 4. در انتها، None در صف گذاشته و خارج می‌شویم. """ if not os.path.exists(output_folder): os.makedirs(output_folder) cap = cv2.VideoCapture(video_path) frame_id = 0 history_tracks = deque(maxlen=20) while True: ret, frame = cap.read() if not ret: # اتمام ویدیو؛ سیگنال پایان به رشته Detection frame_queue.append(None) break # فریم جدید را در صف قرار می‌دهیم تا رشته Detection استفاده کند frame_queue.append((frame_id, frame.copy())) # در اینجا می‌خواهیم اگر ترکینگ خروجی جدید دارد، اعمال کنیم # در غیر این صورت، همان فریم خام را نشان دهیم frame_to_show = frame.copy() # بررسی می‌کنیم آیا صف ترکینگ داده جدیدی دارد # برای اینکه همیشه آخرین خروجی را دریافت کنیم، در حلقه می‌چرخیم تا همه‌ی آیتم‌های قدیمی خالی شود latest_result = None while True: if len(tracking_queue) > 0: item = tracking_queue.popleft() if item is not None: latest_result = item else: # اگر None دیدیم، یعنی کار آن رشته تمام شده # ولی ما همچنان ممکن است فریم‌هایی داشته باشیم تا نمایش دهیم # اینجا فعلاً رد می‌شویم pass else: # صف خالی شد break # اگر latest_result در طول این 50ms آپدیت شد، آن را اعمال کنیم if latest_result is not None: _, tracked_frame, detection_list, track_objects = latest_result # در اینجا می‌توانیم داده را بر روی frame فعلی اعمال کنیم # ولی باید دقت کنیم که آیتمی که از tracker آمده، مربوط به کدام frame_id بوده است. # برای simplicity فرض می‌کنیم همان فریم باشد؛ # اما در عمل بهتر است frame_id ها را چک کنید که هماهنگ باشد. # کشیدن باکس‌های دیتکشن (اختیاری) for (x1, y1, x2, y2, score, cls_id) in detection_list: cv2.rectangle(frame_to_show, (x1, y1), (x2, y2), (100, 0, 255), 2) # کشیدن باکس‌های ترک for bbox in track_objects: tid, x1, y1, x2, y2 = bbox cv2.rectangle(frame_to_show, (x1, y1), (x2, y2), (255, 255, 0), 2) cv2.putText(frame_to_show, f"ID: {tid}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2, cv2.LINE_AA) # مسیر حرکت (history_tracks) - اگر نیاز دارید history_tracks.append([frame_id, track_objects]) for older_frame_id, objects in history_tracks: for obj in objects: _, hx1, hy1, hx2, hy2 = obj center_x = (hx1 + hx2) // 2 center_y = (hy1 + hy2) // 2 cv2.circle(frame_to_show, (center_x, center_y), radius=3, color=(200, 100, 0), thickness=-1) # ذخیره فریم خروجی out_path = os.path.join(output_folder, f"frame_{frame_id}.jpg") cv2.imwrite(out_path, frame_to_show) # نمایش فریم (دقیقاً هر 50 میلی‌ثانیه) cv2.imshow("Display", frame_to_show) frame_id += 1 if cv2.waitKey(50) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() logging.info("[Display Thread] Finished Displaying & Reading Video.") ######################################## # رشته‌ی دیتکشن (Detection) ######################################## def detection_thread_func(frame_queue, detection_queue, yolo_engine_path): """ این تابع، فریم‌ها را از frame_queue برمی‌دارد و روی آنها دیتکشن می‌زند. سپس نتیجه دیتکشن را در detection_queue قرار می‌دهد. وقتی None ببیند، یعنی کار تمام است و None را در detection_queue می‌گذارد و خارج می‌شود. """ cuda.init() device = cuda.Device(0) ctx = device.make_context() try: yolo_model = MFNET(yolo_engine_path) while True: # تلاش برای برداشتن یک آیتم از frame_queue if len(frame_queue) == 0: # صف خالی است time.sleep(0.01) continue item = frame_queue.popleft() if item is None: # پایان کار detection_queue.append(None) break frame_id, frame = item t0 = time.time() input_batch, scale, pad_w, pad_h = preprocess_image(frame) detections = yolo_model.infer(input_batch) final_boxes, final_scores, final_class_ids = fast_postprocess( detections, conf_threshold=0.1, iou_threshold=0.4 ) detection_time = (time.time() - t0) * 1000.0 # تبدیل به فرمت قابل استفاده در DeepSort detection_list = [] for box, score, cls_id in zip(final_boxes, final_scores, final_class_ids): x1, y1, x2, y2 = (box / scale).astype(int) x1, x2 = x1 - pad_w, x2 - pad_w y1, y2 = y1 - pad_h, y2 - pad_h detection_list.append((x1, y1, x2, y2, score, cls_id)) # نتیجه را به صف detection_queue ارسال می‌کنیم detection_queue.append((frame_id, frame, detection_list, detection_time)) finally: ctx.pop() logging.info("[Detection Thread] Finished & CUDA context popped.") ######################################## # رشته‌ی ترکینگ (Tracker) ######################################## def tracker_thread_func(detection_queue, tracking_queue, reid_engine_path): """ این تابع نتیجه‌ی دیتکشن را از detection_queue برمی‌دارد و ترکینگ انجام می‌دهد. سپس نتیجه را در tracking_queue قرار می‌دهد. وقتی None در detection_queue ببیند، پایان کار را اعلام می‌کند. """ cuda.init() device = cuda.Device(0) ctx = device.make_context() try: trt_reid_embedder = TRTReIDEmbedder(engine_path=reid_engine_path, embedding_size=128) trt_reid_embedder.predict = trt_reid_embedder.forward tracker = DeepSort( max_age=50, n_init=3, nms_max_overlap=1.0, embedder=None, embedder_gpu=True, nn_budget=100, half=False ) tracker.embedder = trt_reid_embedder track_times = [] while True: if len(detection_queue) == 0: time.sleep(0.01) continue item = detection_queue.popleft() if item is None: # پایان کار tracking_queue.append(None) break frame_id, frame, detection_list, detection_time = item # تبدیل فرمت دیتکشن برای DeepSort deepsort_detections = [] for (x1, y1, x2, y2, score, cls_id) in detection_list: w, h = x2 - x1, y2 - y1 deepsort_detections.append(([x1, y1, w, h], score, cls_id, frame_id)) t0 = time.time() tracks = tracker.update_tracks(deepsort_detections, frame=frame) track_time = (time.time() - t0) * 1000.0 track_times.append(track_time) track_objects = [] frame_h, frame_w = frame.shape[:2] for t in tracks: if not t.is_confirmed(): continue track_id = t.track_id ltrb = t.to_ltrb() bx1 = max(0, int(ltrb[0])) by1 = max(0, int(ltrb[1])) bx2 = min(frame_w, int(ltrb[2])) by2 = min(frame_h, int(ltrb[3])) track_objects.append([track_id, bx1, by1, bx2, by2]) # خروجی را به صف tracking_queue می‌فرستیم tracking_queue.append((frame_id, frame, detection_list, track_objects)) if len(track_times) > 0: mean_track_time = np.mean(track_times) std_dev = np.std(track_times) logging.info(f"[Tracker Thread] Mean Track Time: {mean_track_time:.2f} ms") logging.info(f"[Tracker Thread] Std Dev Track Time: {std_dev:.2f} ms") finally: ctx.pop() logging.info("[Tracker Thread] Finished & CUDA context popped.") ######################################## # تابع اصلی ######################################## def main(): video_path = "/home/xaviernx/Videos/final_640.mkv" yolo_engine_path = "/home/xaviernx/my files/nx/my files/model/640x640/mfnet.trt" reid_engine_path = "/home/xaviernx/Documents/resnet18.trt" # ایجاد صف‌ها frame_queue = deque(maxlen=1) detection_queue = deque(maxlen=1) tracking_queue = deque(maxlen=1) detection_thread = threading.Thread( target=detection_thread_func, args=(frame_queue, detection_queue, yolo_engine_path), name="DetectionThread" ) tracker_thread = threading.Thread( target=tracker_thread_func, args=(detection_queue, tracking_queue, reid_engine_path), name="TrackerThread" ) display_thread = threading.Thread( target=display_thread_func, args=(video_path, frame_queue, tracking_queue), name="DisplayThread" ) # شروع رشته‌ها detection_thread.start() tracker_thread.start() display_thread.start() # منتظر می‌مانیم تا همه کارشان را تمام کنند detection_thread.join() tracker_thread.join() display_thread.join() logging.info("[Main] All threads finished.") if __name__ == "__main__": main()

توضیح کلی:

  • در تابع display_thread_func، ابتدا ویدیو را باز کرده و در هر تکرار، یک فریم می‌خوانیم و آن را در frame_queue می‌گذاریم. سپس تلاش می‌کنیم جدیدترین خروجی موجود در tracking_queue را (اگر باشد) بخوانیم و بر روی فریم اعمال کنیم. در نهایت با cv2.waitKey(50) دقیقاً هر ۵۰ میلی‌ثانیه، فریم را نمایش می‌دهیم.
  • در رشتهٔ دیتکشن (detection_thread_func) فریم را از frame_queue دریافت کرده، مدل YOLO را اجرا می‌کنیم و خروجی دیتکشن را در detection_queue قرار می‌دهیم.
  • در رشتهٔ ترکینگ (tracker_thread_func) نتایج دیتکشن را از detection_queue برداشته، عملیات DeepSort را انجام می‌دهیم و نتایج را در tracking_queue قرار می‌دهیم.
  • در نهایت اگر یکی از رشته‌ها به پایان رسید (مشاهدهٔ None در صف ورودی)، با قرار دادن None در صف خروجی، رشتهٔ بعدی را نیز از اتمام کار مطلع می‌کند و همگی خاتمه می‌یابند.

با این روش، دقیقاً هر ۵۰ms یکبار فریم نمایش داده می‌شود و اگر در این فاصله، نتیجه‌ای در صف ترکینگ وجود داشته باشد، آن نتیجه بر روی فریم اعمال خواهد شد. در غیر این صورت، فریم خام را نمایش می‌دهیم.

分享这个问答