کد زیر کاملا درست کار میکند من میخواهم هر ۵۰ میلی ...
创建于:2025年3月17日
创建于:2025年3月17日
کد زیر کاملا درست کار میکند
من میخواهم هر ۵۰ میلی ثانیه یکبار فریم را نمایش بدم در صورتی که در صف tracker چیزی بود آن را با فریم نمایش بدم و اگر هم نبود فقط فریم خالی را نمایش بدهم
کد نهایی را کامل و بدون ارور بنویس و از deque استفاده کن
یادت باشه دقیق سر هر ۵۰ میلی ثانیه نمایش بده و اگر tracker صف داده داشت اون رو کپی کنه و از اون استفاده کنه
کد:
import cv2
import numpy as np
import time
import warnings
import logging
import os
from collections import deque
import threading
import tensorrt as trt
import pycuda.driver as cuda
from deep_sort_realtime.deepsort_tracker import DeepSort
np.bool = np.bool_
warnings.filterwarnings("ignore", category=FutureWarning, message=".np.bool.")
########################################
########################################
logging.basicConfig(
level=logging.INFO, # سطح ثبت لاگها
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
########################################
########################################
class BaseEmbedder:
def init(self, embedding_size=128):
self.embedding_size = embedding_size
textdef forward(self, x): raise NotImplementedError("Subclasses must implement this method.")
class TRTReIDEmbedder(BaseEmbedder):
def init(self, engine_path, embedding_size=128):
super().init(embedding_size=embedding_size)
self.logger = trt.Logger(trt.Logger.WARNING)
self.runtime = trt.Runtime(self.logger)
self.engine = self._load_engine(engine_path)
self.context = self.engine.create_execution_context()
self.inputs, self.outputs, self.bindings = self._allocate_buffers()
textdef _load_engine(self, engine_path): with open(engine_path, "rb") as f: engine_data = f.read() return self.runtime.deserialize_cuda_engine(engine_data) def _allocate_buffers(self): inputs, outputs, bindings = [], [], [] for i in range(self.engine.num_bindings): name = self.engine.get_tensor_name(i) shape = self.engine.get_tensor_shape(name) dtype = self.engine.get_tensor_dtype(name) size = trt.volume(shape) np_type = trt.nptype(dtype) buf = cuda.mem_alloc(size * np_type().itemsize) bindings.append(buf) mode = self.engine.get_tensor_mode(name) if mode == trt.TensorIOMode.INPUT: inputs.append(buf) else: outputs.append(buf) return inputs, outputs, bindings def preprocess_single(self, image): # اگر کانال رنگی اشتباه باشد، اصلاح میکنیم if image.shape[0] == 3 and len(image.shape) == 3: image = image.transpose(1, 2, 0) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.resize(image, (224, 224)) image = image.astype(np.float32) / 255.0 mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) std = np.array([0.229, 0.224, 0.225], dtype=np.float32) image = (image - mean) / std image = image.transpose(2, 0, 1) image = np.expand_dims(image, axis=0) return np.ascontiguousarray(image) def forward(self, np_image_crops): if len(np_image_crops) == 0: return np.empty((0, self.embedding_size), dtype=np.float32) outputs = [] for crop in np_image_crops: processed = self.preprocess_single(crop) cuda.memcpy_htod(self.inputs[0], processed.ravel()) self.context.execute_v2(self.bindings) # در صورت نیاز به همگامسازی با GPU cuda.Context.synchronize() output_name = self.engine.get_tensor_name(1) output_shape = self.engine.get_tensor_shape(output_name) output = np.empty(output_shape, dtype=np.float32) cuda.memcpy_dtoh(output, self.outputs[0]) outputs.append(output) return np.concatenate(outputs, axis=0)
########################################
########################################
class MFNET:
def init(self, engine_path):
self.logger = trt.Logger(trt.Logger.WARNING)
self.runtime = trt.Runtime(self.logger)
self.engine = self._load_engine(engine_path)
self.context = self.engine.create_execution_context()
self.inputs, self.outputs, self.bindings = self._allocate_buffers()
textdef _load_engine(self, engine_path): with open(engine_path, "rb") as f: engine_data = f.read() return self.runtime.deserialize_cuda_engine(engine_data) def _allocate_buffers(self): inputs, outputs, bindings = [], [], [] for i in range(self.engine.num_bindings): name = self.engine.get_tensor_name(i) shape = self.engine.get_tensor_shape(name) dtype = self.engine.get_tensor_dtype(name) size = trt.volume(shape) np_dtype = trt.nptype(dtype) buffer = cuda.mem_alloc(size * np_dtype().itemsize) bindings.append(int(buffer)) io_mode = self.engine.get_tensor_mode(name) if io_mode == trt.TensorIOMode.INPUT: inputs.append(buffer) else: outputs.append(buffer) return inputs, outputs, bindings def infer(self, input_batch): cuda.memcpy_htod(self.inputs[0], input_batch) self.context.execute_v2(self.bindings) cuda.Context.synchronize() # فرض میکنیم فقط یک خروجی با ایندکس 1 داریم output_name = self.engine.get_tensor_name(1) output_shape = self.engine.get_tensor_shape(output_name) output = np.empty(trt.volume(output_shape), dtype=np.float32) cuda.memcpy_dtoh(output, self.outputs[0]) return output.reshape(-1, 10)
########################################
########################################
def nms(boxes, scores, iou_threshold):
x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
areas = (x2 - x1) * (y2 - y1)
order = np.argsort(scores)[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
overlap = (w * h) / (areas[i] + areas[order[1:]] - w * h)
order = order[np.where(overlap <= iou_threshold)[0] + 1]
return np.array(keep)
def fast_postprocess(output, conf_threshold=0.1, iou_threshold=0.4):
if output.shape[0] == 0:
return np.empty((0, 4)), np.array([]), np.array([])
textboxes, conf_scores, class_scores = output[:, :4], output[:, 4], output[:, 5:] mask = conf_scores >= conf_threshold boxes, scores, class_confidences = boxes[mask], conf_scores[mask], class_scores[mask] if len(boxes) == 0: return np.empty((0, 4)), np.array([]), np.array([]) max_class_conf = np.max(class_confidences, axis=1) mask2 = max_class_conf >= conf_threshold boxes, scores, class_confidences = boxes[mask2], scores[mask2], class_confidences[mask2] if len(boxes) == 0: return np.empty((0, 4)), np.array([]), np.array([]) x_center, y_center, width, height = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] x_min = x_center - width / 2 y_min = y_center - height / 2 x_max = x_center + width / 2 y_max = y_center + height / 2 boxes_xyxy = np.stack([x_min, y_min, x_max, y_max], axis=1) inds = nms(boxes_xyxy, scores, iou_threshold) boxes_final = boxes_xyxy[inds] scores_final = scores[inds] class_ids_final = np.argmax(class_confidences[inds], axis=1) return boxes_final, scores_final, class_ids_final
########################################
########################################
def letterbox_image(image, target_size):
ih, iw = image.shape[:2]
h, w = target_size
scale = min(w / iw, h / ih)
new_w, new_h = int(iw * scale), int(ih * scale)
resized_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
pad_w, pad_h = (w - new_w) // 2, (h - new_h) // 2
padded_image = np.full((h, w, 3), 128, dtype=np.uint8)
padded_image[pad_h:pad_h + new_h, pad_w:pad_w + new_w] = resized_image
return padded_image, scale, pad_w, pad_h
def preprocess_image(image, input_size=(640, 640)):
img, scale, pad_w, pad_h = letterbox_image(image, input_size)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)
img = np.expand_dims(img, axis=0)
return np.ascontiguousarray(img), scale, pad_w, pad_h
########################################
########################################
def display_thread_func(video_path, frame_queue, tracking_queue ,output_folder='output_frames'):
"""
در این تابع:
1. ویدیو باز میشود و فریمها یکییکی خوانده میشوند.
2. هر 50 میلیثانیه یکبار، فریم فعلی در صف frame_queue قرار میگیرد (برای دیتکشن).
3. به صورت غیرمسدود، نتایج ترک را از tracking_queue برداشته و اگر نتیجه جدیدی وجود داشت،
باکسها را روی همان فریم میکشد.
4. فریم را (چه با باکس، چه بدون باکس) نمایش میدهد.
5. اگر به انتهای ویدیو رسیدیم، None در صف frame_queue گذاشته و خارج میشویم.
6. برای جلوگیری از انسداد، از get_nowait استفاده میشود و اگر چیزی نبود، صبر نمیکنیم.
"""
cap = cv2.VideoCapture(video_path)
frame_id = 0
last_tracking_data = None # آخرین نتیجهی ترک که دریافت کردیم
text# برای کشیدن مسیر فریمهای اخیر history_tracks = deque(maxlen=20) while True: ret, frame = cap.read() if not ret: # اتمام ویدیو frame_queue.put(None) # سیگنال پایان به رشتهی دیتکشن break # print("frame_id : ", frame_id) frame_queue.append((frame_id, frame)) # اضافه کردن به انتهای صف if frame_queue: # بررسی اینکه صف خالی نباشد print("************************************************************************************************") last_tracking_data = frame_queue[0] # دسترسی به اولین عنصر بدون حذف آن frame_id, frame = last_tracking_data print("frame_id : ", frame_id) else: continue if tracking_queue: print("----------------------------------------------------------------------------------") out = tracking_queue[0] frame_id, frame, detection_list, track_objects = out # print("last_tracking_data") # کشیدن باکسهای دیتکشن (اختیاری) for (x1, y1, x2, y2, score, cls_id) in detection_list: dx1, dy1, dw, dh = x1, y1, x2, y2 # dx2 = dx1 + dw # dy2 = dy1 + dh cv2.rectangle(frame, (dx1, dy1), (dw, dh), (100, 0, 255), 2) for bbox in track_objects: tid, x1, y1, x2, y2 = bbox cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 255, 0), 2) cv2.putText(frame, f"Track ID: {tid}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2, cv2.LINE_AA) # آپدیت history_tracks history_tracks.append([frame_id, track_objects]) # کشیدن مرکز ترک در فریمهای اخیر (مسیر حرکت) for older_frame_id, objects in history_tracks: for obj in objects: _, x1, y1, x2, y2 = obj center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 cv2.circle(frame, (center_x, center_y), radius=3, color=(200, 100, 0), thickness=-1) # ذخیره فریم در خروجی out_path = os.path.join(output_folder, f"frame_{frame_id}.jpg") cv2.imwrite(out_path, frame) # تلاش برای خواندن همهی نتایج ترک موجود در صف، تا آخرین نتیجه را بهروز کنیم # while True: # # if frame_queue: # بررسی اینکه صف خالی نباشد # print("************************************************************************************************") # # last_tracking_data = frame_queue[0] # دسترسی به اولین عنصر بدون حذف آن # frame , frame_id = last_tracking_data # else: # continue # # print("tracking_queue : ", len(tracking_queue)) # # if tracking_queue: # print("----------------------------------------------------------------------------------") # # # out = tracking_queue[0] # # frame_id ,frame ,detection_list ,track_objects = out # # print("last_tracking_data") # # # کشیدن باکسهای دیتکشن (اختیاری) # for (x1, y1, x2, y2, score, cls_id) in detection_list: # dx1, dy1, dw, dh = x1, y1, x2, y2 # # dx2 = dx1 + dw # # dy2 = dy1 + dh # cv2.rectangle(frame, (dx1, dy1), (dw, dh), (100, 0, 255), 2) # # for bbox in track_objects: # tid, x1, y1, x2, y2 = bbox # cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 255, 0), 2) # cv2.putText(frame, f"Track ID: {tid}", (x1, y1 - 10), # cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), # 2, cv2.LINE_AA) # # # آپدیت history_tracks # history_tracks.append([frame_id, track_objects]) # # کشیدن مرکز ترک در فریمهای اخیر (مسیر حرکت) # for older_frame_id, objects in history_tracks: # for obj in objects: # _, x1, y1, x2, y2 = obj # center_x = (x1 + x2) // 2 # center_y = (y1 + y2) // 2 # cv2.circle(frame, (center_x, center_y), radius=3, # color=(200, 100, 0), thickness=-1) # # # # ذخیره فریم در خروجی # out_path = os.path.join(output_folder, f"frame_{frame_id}.jpg") # cv2.imwrite(out_path, frame) # # else: # continue # نمایش فریم # print("نمایشششششششششششششششششششششششششششششششششششششششششششششششششششششششششششششششششش") cv2.imshow("Display", frame) frame_id += 1 # هر 50 میلیثانیه یکبار بروزرسانی نمایش if cv2.waitKey(50) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() logging.info("[Display Thread] Finished Displaying & Reading Video.")
########################################
########################################
def detection_thread_func(frame_queue, detection_queue, yolo_engine_path):
"""
این تابع فریمها را از frame_queue برمیدارد و روی آنها دیتکشن میزند.
اگر صف خالی بود، منتظر نمیماند (به کمک get_nowait) و ادامه میدهد.
نتیجهی دیتکشن را در detection_queue میگذارد.
"""
cuda.init()
device = cuda.Device(0)
ctx = device.make_context()
texttry: # بارگذاری مدل YOLO yolo_model = MFNET(yolo_engine_path) while True: print("detection") # تلاش برای برداشتن فریم از صف به صورت غیرمسدود # try: # item = frame_queue.get_nowait() # except queue.Empty: # # اگر خالی است، کمی صبر کن و ادامه بده # # time.sleep(0.01) # continue if frame_queue: # بررسی اینکه صف خالی نباشد item = frame_queue[0] # دسترسی به اولین عنصر بدون حذف آن # استفاده از item بدون حذف از صف else: continue if item is None: # یعنی دیگر فریمی نداریم، پایان detection_queue.put(None) break frame_id, frame = item t0 = time.time() # انجام دیتکشن input_batch, scale, pad_w, pad_h = preprocess_image(frame) detections = yolo_model.infer(input_batch) final_boxes, final_scores, final_class_ids = fast_postprocess( detections, conf_threshold=0.1, iou_threshold=0.4 ) detection_time = (time.time() - t0) * 1000.0 # تبدیل به فرمت قابل استفاده در DeepSort detection_list = [] for box, score, cls_id in zip(final_boxes, final_scores, final_class_ids): x1, y1, x2, y2 = (box / scale).astype(int) x1, x2 = x1 - pad_w, x2 - pad_w y1, y2 = y1 - pad_h, y2 - pad_h detection_list.append((x1, y1, x2, y2, score, cls_id)) # قرار دادن در صف detection_queue.append((frame_id, frame, detection_list, detection_time)) # print("frame_id : ", frame_id , "frame : ", frame, "detection_list : ", detection_list) finally: ctx.pop() logging.info("[Detection Thread] Finished & CUDA context popped.")
########################################
########################################
def tracker_thread_func(detection_queue, tracking_queue, reid_engine_path):
"""
این تابع نتیجهی دیتکشن را از detection_queue برداشته و ترکینگ انجام میدهد.
سپس نتیجه را در tracking_queue میگذارد.
اگر detection_queue خالی بود، منتظر نمیماند و ادامه میدهد.
"""
cuda.init()
device = cuda.Device(0)
ctx = device.make_context()
texttry: # بارگذاری مدل ReID trt_reid_embedder = TRTReIDEmbedder( engine_path=reid_engine_path, embedding_size=128 ) trt_reid_embedder.predict = trt_reid_embedder.forward # ساخت DeepSort tracker = DeepSort( max_age=50, n_init=3, nms_max_overlap=1.0, embedder=None, embedder_gpu=True, nn_budget=100, half=False ) tracker.embedder = trt_reid_embedder track_total_list = [] while True: print("tracker") # try: # item = detection_queue.get_nowait() # except queue.Empty: # # اگر خالی بود، کمی صبر کنیم و رد شویم # time.sleep(0.01) # continue if detection_queue: # بررسی اینکه صف خالی نباشد item = detection_queue[0] # دسترسی به اولین عنصر بدون حذف آن # استفاده از item بدون حذف از صف else: continue if item is None: # سیگنال پایان tracking_queue.put(None) break frame_id, frame, detection_list , detection_time = item # تبدیل فرمت دیتکشن برای DeepSort deepsort_detections = [] for (x1, y1, x2, y2, score, cls_id) in detection_list: w, h = x2 - x1, y2 - y1 deepsort_detections.append(([x1, y1, w, h], score, cls_id, frame_id)) t0 = time.time() # ترکینگ tracks = tracker.update_tracks(deepsort_detections, frame=frame) track_time = (time.time() - t0) * 1000.0 track_total_list.append(track_time) # استخراج باکسهای نهایی ترک track_objects = [] frame_h, frame_w = frame.shape[:2] for t in tracks: if not t.is_confirmed(): continue track_id = t.track_id ltrb = t.to_ltrb() bx1 = max(0, int(ltrb[0])) by1 = max(0, int(ltrb[1])) bx2 = min(frame_w, int(ltrb[2])) by2 = min(frame_h, int(ltrb[3])) track_objects.append([track_id, bx1, by1, bx2, by2]) # # قرار دادن در صف نمایش # tracking_queue.append({ # 'frame_id': frame_id, # 'frame': frame, # 'detection_list': detection_list, # 'track_objects': bboxes, # 'detection_time': detection_time, # 'track_time': track_time # }) # قرار دادن در صف نمایش tracking_queue.append((frame_id, frame, detection_list, track_objects)) # print("tracking_queue : ", tracking_queue) if len(track_total_list) > 0: mean_track_time = np.mean(track_total_list) std_dev = np.std(track_total_list) logging.info(f"[Tracker Thread] Mean Track Time: {mean_track_time:.2f} ms") logging.info(f"[Tracker Thread] Std Dev Track Time: {std_dev:.2f} ms") finally: ctx.pop() logging.info("[Tracker Thread] Finished & CUDA context popped.")
########################################
########################################
def main():
# مسیرها
video_path = "/home/xaviernx/Videos/final_640.mkv"
yolo_engine_path = "/home/xaviernx/my files/nx/my files/model/640x640/mfnet.trt"
reid_engine_path = "/home/xaviernx/Documents/resnet18.trt"
text# ایجاد صفها frame_queue = deque(maxlen=1) detection_queue = deque(maxlen=1) tracking_queue = deque(maxlen=1) # ساخت Threadها detection_thread = threading.Thread( target=detection_thread_func, args=(frame_queue, detection_queue, yolo_engine_path), name="DetectionThread" ) tracker_thread = threading.Thread( target=tracker_thread_func, args=(detection_queue, tracking_queue, reid_engine_path), name="TrackerThread" ) display_thread = threading.Thread( target=display_thread_func, args=(video_path, frame_queue, tracking_queue), name="DisplayThread" ) # شروع Threadها detection_thread.start() tracker_thread.start() display_thread.start() # منتظر ماندن تا همه تمام شوند detection_thread.join() tracker_thread.join() display_thread.join() logging.info("[Main] All threads finished.")
if name == "main":
main()
در کد زیر، منطق صفها (Queue/Deque) بهصورت درست پیادهسازی شده است تا هر بار دادهٔ جدیدی که از مرحلهٔ قبلی تولید میشود، فقط یک بار مصرف شود (از پُشت صف اضافه و از جلوی صف برداشته میشود). همچنین در رشتهٔ نمایش (Display)، دقیقاً هر ۵۰ میلیثانیه یک فریم را نمایش میدهیم. اگر در صف ترکینگ (tracking_queue) خروجیای وجود داشت، آن را با فریم میکشیم و اگر نبود، همان فریم خام را نشان میدهیم.
نکته: در این کد از سه صف (deque) استفاده شده است:
همچنین دقت کنید که برای پایان کار هر رشته (Thread)، از گذاشتن None
در صف مربوطه استفاده شده است و هر رشته بعد از دیدن None
، کار خود را خاتمه میدهد.
pythonimport cv2 import numpy as np import time import warnings import logging import os import threading import tensorrt as trt import pycuda.driver as cuda # توجه داشته باشید از import pycuda.autoinit استفاده نشده # چون ممکن است با کانتکستهای سفارشی تداخل کند. from collections import deque from deep_sort_realtime.deepsort_tracker import DeepSort # حذف اخطارهای مربوط به np.bool در نسخههای جدید NumPy np.bool = np.bool_ warnings.filterwarnings("ignore", category=FutureWarning, message=".*np.bool.*") ######################################## # تنظیمات اولیه logging ######################################## logging.basicConfig( level=logging.INFO, # سطح ثبت لاگها format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) ######################################## # کلاسهای BaseEmbedder + TRTReIDEmbedder ######################################## class BaseEmbedder: def __init__(self, embedding_size=128): self.embedding_size = embedding_size def forward(self, x): raise NotImplementedError("Subclasses must implement this method.") class TRTReIDEmbedder(BaseEmbedder): def __init__(self, engine_path, embedding_size=128): super().__init__(embedding_size=embedding_size) self.logger = trt.Logger(trt.Logger.WARNING) self.runtime = trt.Runtime(self.logger) self.engine = self._load_engine(engine_path) self.context = self.engine.create_execution_context() self.inputs, self.outputs, self.bindings = self._allocate_buffers() def _load_engine(self, engine_path): with open(engine_path, "rb") as f: engine_data = f.read() return self.runtime.deserialize_cuda_engine(engine_data) def _allocate_buffers(self): inputs, outputs, bindings = [], [], [] for i in range(self.engine.num_bindings): name = self.engine.get_tensor_name(i) shape = self.engine.get_tensor_shape(name) dtype = self.engine.get_tensor_dtype(name) size = trt.volume(shape) np_type = trt.nptype(dtype) buf = cuda.mem_alloc(size * np_type().itemsize) bindings.append(buf) mode = self.engine.get_tensor_mode(name) if mode == trt.TensorIOMode.INPUT: inputs.append(buf) else: outputs.append(buf) return inputs, outputs, bindings def preprocess_single(self, image): # اگر کانال رنگی اشتباه باشد، اصلاح میکنیم if image.shape[0] == 3 and len(image.shape) == 3: image = image.transpose(1, 2, 0) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.resize(image, (224, 224)) image = image.astype(np.float32) / 255.0 mean = np.array([0.485, 0.456, 0.406], dtype=np.float32) std = np.array([0.229, 0.224, 0.225], dtype=np.float32) image = (image - mean) / std image = image.transpose(2, 0, 1) image = np.expand_dims(image, axis=0) return np.ascontiguousarray(image) def forward(self, np_image_crops): if len(np_image_crops) == 0: return np.empty((0, self.embedding_size), dtype=np.float32) outputs = [] for crop in np_image_crops: processed = self.preprocess_single(crop) cuda.memcpy_htod(self.inputs[0], processed.ravel()) self.context.execute_v2(self.bindings) cuda.Context.synchronize() output_name = self.engine.get_tensor_name(1) output_shape = self.engine.get_tensor_shape(output_name) output = np.empty(output_shape, dtype=np.float32) cuda.memcpy_dtoh(output, self.outputs[0]) outputs.append(output) return np.concatenate(outputs, axis=0) ######################################## # YOLO TensorRT Inference Class ######################################## class MFNET: def __init__(self, engine_path): self.logger = trt.Logger(trt.Logger.WARNING) self.runtime = trt.Runtime(self.logger) self.engine = self._load_engine(engine_path) self.context = self.engine.create_execution_context() self.inputs, self.outputs, self.bindings = self._allocate_buffers() def _load_engine(self, engine_path): with open(engine_path, "rb") as f: engine_data = f.read() return self.runtime.deserialize_cuda_engine(engine_data) def _allocate_buffers(self): inputs, outputs, bindings = [], [], [] for i in range(self.engine.num_bindings): name = self.engine.get_tensor_name(i) shape = self.engine.get_tensor_shape(name) dtype = self.engine.get_tensor_dtype(name) size = trt.volume(shape) np_dtype = trt.nptype(dtype) buffer = cuda.mem_alloc(size * np_dtype().itemsize) bindings.append(int(buffer)) io_mode = self.engine.get_tensor_mode(name) if io_mode == trt.TensorIOMode.INPUT: inputs.append(buffer) else: outputs.append(buffer) return inputs, outputs, bindings def infer(self, input_batch): cuda.memcpy_htod(self.inputs[0], input_batch) self.context.execute_v2(self.bindings) cuda.Context.synchronize() # فرض میکنیم فقط یک خروجی با ایندکس 1 داریم output_name = self.engine.get_tensor_name(1) output_shape = self.engine.get_tensor_shape(output_name) output = np.empty(trt.volume(output_shape), dtype=np.float32) cuda.memcpy_dtoh(output, self.outputs[0]) return output.reshape(-1, 10) ######################################## # NMS / Postprocessing ######################################## def nms(boxes, scores, iou_threshold): x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] areas = (x2 - x1) * (y2 - y1) order = np.argsort(scores)[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1) h = np.maximum(0.0, yy2 - yy1) overlap = (w * h) / (areas[i] + areas[order[1:]] - w * h) order = order[np.where(overlap <= iou_threshold)[0] + 1] return np.array(keep) def fast_postprocess(output, conf_threshold=0.1, iou_threshold=0.4): if output.shape[0] == 0: return np.empty((0, 4)), np.array([]), np.array([]) boxes, conf_scores, class_scores = output[:, :4], output[:, 4], output[:, 5:] mask = conf_scores >= conf_threshold boxes, scores, class_confidences = boxes[mask], conf_scores[mask], class_scores[mask] if len(boxes) == 0: return np.empty((0, 4)), np.array([]), np.array([]) max_class_conf = np.max(class_confidences, axis=1) mask2 = max_class_conf >= conf_threshold boxes, scores, class_confidences = boxes[mask2], scores[mask2], class_confidences[mask2] if len(boxes) == 0: return np.empty((0, 4)), np.array([]), np.array([]) x_center, y_center, width, height = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] x_min = x_center - width / 2 y_min = y_center - height / 2 x_max = x_center + width / 2 y_max = y_center + height / 2 boxes_xyxy = np.stack([x_min, y_min, x_max, y_max], axis=1) inds = nms(boxes_xyxy, scores, iou_threshold) boxes_final = boxes_xyxy[inds] scores_final = scores[inds] class_ids_final = np.argmax(class_confidences[inds], axis=1) return boxes_final, scores_final, class_ids_final ######################################## # تابع کمکی تغییر سایز و آمادهسازی ورودی ######################################## def letterbox_image(image, target_size): ih, iw = image.shape[:2] h, w = target_size scale = min(w / iw, h / ih) new_w, new_h = int(iw * scale), int(ih * scale) resized_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR) pad_w, pad_h = (w - new_w) // 2, (h - new_h) // 2 padded_image = np.full((h, w, 3), 128, dtype=np.uint8) padded_image[pad_h:pad_h + new_h, pad_w:pad_w + new_w] = resized_image return padded_image, scale, pad_w, pad_h def preprocess_image(image, input_size=(640, 640)): img, scale, pad_w, pad_h = letterbox_image(image, input_size) img = img.astype(np.float32) / 255.0 img = img.transpose(2, 0, 1) img = np.expand_dims(img, axis=0) return np.ascontiguousarray(img), scale, pad_w, pad_h ######################################## # رشتهی نمایش (Display) ######################################## def display_thread_func(video_path, frame_queue, tracking_queue, output_folder='output_frames'): """ 1. ویدیو باز میشود و فریمها یکییکی خوانده میشوند. 2. هر بار فریم در صف frame_queue قرار میگیرد. 3. دقیقاً هر 50 میلیثانیه یکبار، فریم جاری را نمایش میدهیم. اگر در صف tracking_queue دیتای جدیدی موجود بود، آن را روی همین فریم اعمال میکنیم. اگر نبود، همان فریم خام را نشان میدهیم. 4. در انتها، None در صف گذاشته و خارج میشویم. """ if not os.path.exists(output_folder): os.makedirs(output_folder) cap = cv2.VideoCapture(video_path) frame_id = 0 history_tracks = deque(maxlen=20) while True: ret, frame = cap.read() if not ret: # اتمام ویدیو؛ سیگنال پایان به رشته Detection frame_queue.append(None) break # فریم جدید را در صف قرار میدهیم تا رشته Detection استفاده کند frame_queue.append((frame_id, frame.copy())) # در اینجا میخواهیم اگر ترکینگ خروجی جدید دارد، اعمال کنیم # در غیر این صورت، همان فریم خام را نشان دهیم frame_to_show = frame.copy() # بررسی میکنیم آیا صف ترکینگ داده جدیدی دارد # برای اینکه همیشه آخرین خروجی را دریافت کنیم، در حلقه میچرخیم تا همهی آیتمهای قدیمی خالی شود latest_result = None while True: if len(tracking_queue) > 0: item = tracking_queue.popleft() if item is not None: latest_result = item else: # اگر None دیدیم، یعنی کار آن رشته تمام شده # ولی ما همچنان ممکن است فریمهایی داشته باشیم تا نمایش دهیم # اینجا فعلاً رد میشویم pass else: # صف خالی شد break # اگر latest_result در طول این 50ms آپدیت شد، آن را اعمال کنیم if latest_result is not None: _, tracked_frame, detection_list, track_objects = latest_result # در اینجا میتوانیم داده را بر روی frame فعلی اعمال کنیم # ولی باید دقت کنیم که آیتمی که از tracker آمده، مربوط به کدام frame_id بوده است. # برای simplicity فرض میکنیم همان فریم باشد؛ # اما در عمل بهتر است frame_id ها را چک کنید که هماهنگ باشد. # کشیدن باکسهای دیتکشن (اختیاری) for (x1, y1, x2, y2, score, cls_id) in detection_list: cv2.rectangle(frame_to_show, (x1, y1), (x2, y2), (100, 0, 255), 2) # کشیدن باکسهای ترک for bbox in track_objects: tid, x1, y1, x2, y2 = bbox cv2.rectangle(frame_to_show, (x1, y1), (x2, y2), (255, 255, 0), 2) cv2.putText(frame_to_show, f"ID: {tid}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2, cv2.LINE_AA) # مسیر حرکت (history_tracks) - اگر نیاز دارید history_tracks.append([frame_id, track_objects]) for older_frame_id, objects in history_tracks: for obj in objects: _, hx1, hy1, hx2, hy2 = obj center_x = (hx1 + hx2) // 2 center_y = (hy1 + hy2) // 2 cv2.circle(frame_to_show, (center_x, center_y), radius=3, color=(200, 100, 0), thickness=-1) # ذخیره فریم خروجی out_path = os.path.join(output_folder, f"frame_{frame_id}.jpg") cv2.imwrite(out_path, frame_to_show) # نمایش فریم (دقیقاً هر 50 میلیثانیه) cv2.imshow("Display", frame_to_show) frame_id += 1 if cv2.waitKey(50) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() logging.info("[Display Thread] Finished Displaying & Reading Video.") ######################################## # رشتهی دیتکشن (Detection) ######################################## def detection_thread_func(frame_queue, detection_queue, yolo_engine_path): """ این تابع، فریمها را از frame_queue برمیدارد و روی آنها دیتکشن میزند. سپس نتیجه دیتکشن را در detection_queue قرار میدهد. وقتی None ببیند، یعنی کار تمام است و None را در detection_queue میگذارد و خارج میشود. """ cuda.init() device = cuda.Device(0) ctx = device.make_context() try: yolo_model = MFNET(yolo_engine_path) while True: # تلاش برای برداشتن یک آیتم از frame_queue if len(frame_queue) == 0: # صف خالی است time.sleep(0.01) continue item = frame_queue.popleft() if item is None: # پایان کار detection_queue.append(None) break frame_id, frame = item t0 = time.time() input_batch, scale, pad_w, pad_h = preprocess_image(frame) detections = yolo_model.infer(input_batch) final_boxes, final_scores, final_class_ids = fast_postprocess( detections, conf_threshold=0.1, iou_threshold=0.4 ) detection_time = (time.time() - t0) * 1000.0 # تبدیل به فرمت قابل استفاده در DeepSort detection_list = [] for box, score, cls_id in zip(final_boxes, final_scores, final_class_ids): x1, y1, x2, y2 = (box / scale).astype(int) x1, x2 = x1 - pad_w, x2 - pad_w y1, y2 = y1 - pad_h, y2 - pad_h detection_list.append((x1, y1, x2, y2, score, cls_id)) # نتیجه را به صف detection_queue ارسال میکنیم detection_queue.append((frame_id, frame, detection_list, detection_time)) finally: ctx.pop() logging.info("[Detection Thread] Finished & CUDA context popped.") ######################################## # رشتهی ترکینگ (Tracker) ######################################## def tracker_thread_func(detection_queue, tracking_queue, reid_engine_path): """ این تابع نتیجهی دیتکشن را از detection_queue برمیدارد و ترکینگ انجام میدهد. سپس نتیجه را در tracking_queue قرار میدهد. وقتی None در detection_queue ببیند، پایان کار را اعلام میکند. """ cuda.init() device = cuda.Device(0) ctx = device.make_context() try: trt_reid_embedder = TRTReIDEmbedder(engine_path=reid_engine_path, embedding_size=128) trt_reid_embedder.predict = trt_reid_embedder.forward tracker = DeepSort( max_age=50, n_init=3, nms_max_overlap=1.0, embedder=None, embedder_gpu=True, nn_budget=100, half=False ) tracker.embedder = trt_reid_embedder track_times = [] while True: if len(detection_queue) == 0: time.sleep(0.01) continue item = detection_queue.popleft() if item is None: # پایان کار tracking_queue.append(None) break frame_id, frame, detection_list, detection_time = item # تبدیل فرمت دیتکشن برای DeepSort deepsort_detections = [] for (x1, y1, x2, y2, score, cls_id) in detection_list: w, h = x2 - x1, y2 - y1 deepsort_detections.append(([x1, y1, w, h], score, cls_id, frame_id)) t0 = time.time() tracks = tracker.update_tracks(deepsort_detections, frame=frame) track_time = (time.time() - t0) * 1000.0 track_times.append(track_time) track_objects = [] frame_h, frame_w = frame.shape[:2] for t in tracks: if not t.is_confirmed(): continue track_id = t.track_id ltrb = t.to_ltrb() bx1 = max(0, int(ltrb[0])) by1 = max(0, int(ltrb[1])) bx2 = min(frame_w, int(ltrb[2])) by2 = min(frame_h, int(ltrb[3])) track_objects.append([track_id, bx1, by1, bx2, by2]) # خروجی را به صف tracking_queue میفرستیم tracking_queue.append((frame_id, frame, detection_list, track_objects)) if len(track_times) > 0: mean_track_time = np.mean(track_times) std_dev = np.std(track_times) logging.info(f"[Tracker Thread] Mean Track Time: {mean_track_time:.2f} ms") logging.info(f"[Tracker Thread] Std Dev Track Time: {std_dev:.2f} ms") finally: ctx.pop() logging.info("[Tracker Thread] Finished & CUDA context popped.") ######################################## # تابع اصلی ######################################## def main(): video_path = "/home/xaviernx/Videos/final_640.mkv" yolo_engine_path = "/home/xaviernx/my files/nx/my files/model/640x640/mfnet.trt" reid_engine_path = "/home/xaviernx/Documents/resnet18.trt" # ایجاد صفها frame_queue = deque(maxlen=1) detection_queue = deque(maxlen=1) tracking_queue = deque(maxlen=1) detection_thread = threading.Thread( target=detection_thread_func, args=(frame_queue, detection_queue, yolo_engine_path), name="DetectionThread" ) tracker_thread = threading.Thread( target=tracker_thread_func, args=(detection_queue, tracking_queue, reid_engine_path), name="TrackerThread" ) display_thread = threading.Thread( target=display_thread_func, args=(video_path, frame_queue, tracking_queue), name="DisplayThread" ) # شروع رشتهها detection_thread.start() tracker_thread.start() display_thread.start() # منتظر میمانیم تا همه کارشان را تمام کنند detection_thread.join() tracker_thread.join() display_thread.join() logging.info("[Main] All threads finished.") if __name__ == "__main__": main()
توضیح کلی:
display_thread_func
، ابتدا ویدیو را باز کرده و در هر تکرار، یک فریم میخوانیم و آن را در frame_queue
میگذاریم. سپس تلاش میکنیم جدیدترین خروجی موجود در tracking_queue
را (اگر باشد) بخوانیم و بر روی فریم اعمال کنیم. در نهایت با cv2.waitKey(50)
دقیقاً هر ۵۰ میلیثانیه، فریم را نمایش میدهیم.detection_thread_func
) فریم را از frame_queue
دریافت کرده، مدل YOLO را اجرا میکنیم و خروجی دیتکشن را در detection_queue
قرار میدهیم.tracker_thread_func
) نتایج دیتکشن را از detection_queue
برداشته، عملیات DeepSort را انجام میدهیم و نتایج را در tracking_queue
قرار میدهیم.None
در صف ورودی)، با قرار دادن None
در صف خروجی، رشتهٔ بعدی را نیز از اتمام کار مطلع میکند و همگی خاتمه مییابند.با این روش، دقیقاً هر ۵۰ms یکبار فریم نمایش داده میشود و اگر در این فاصله، نتیجهای در صف ترکینگ وجود داشته باشد، آن نتیجه بر روی فریم اعمال خواهد شد. در غیر این صورت، فریم خام را نمایش میدهیم.