# 檔名: wafer_wafer_640x480_mp4_queue_arch_fixed_v3....

Criado em: 22 de setembro de 2025

Respondido usando GPT-5 por Chat01

Pergunta

檔名: wafer_wafer_640x480_mp4_queue_arch_fixed_v3.py

import cv2
import numpy as np
import cvzone
import time
import os
import sys
import shutil
import platform
import datetime
import threading
import socket
from queue import Queue, Empty, Full
import traceback # Import for detailed error logging
from ultralytics import YOLO
from MBTS import MBTS
import wafer_seg_640x480_video

==================== 程式設定 ====================

YOLO_PIN_MODEL_PATH = 'best_slotIR_640x480.pt'
YOLO_SEG_MODEL_PATH = 'best_result_640x480.pt'
CLASSES_FILE = 'classes1.txt'
SAVE_DIR_TRACKED = "RED_circuit"
SAVE_DIR_GREEN = "Green_square"
RECORDS_DIR = "Records"
CAMERA_INDEX = 0
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
FPS = 60.0
EXPOSURE = -7
BRIGHTNESS = 40.0
CONTRAST = 32.0
TCP_PORT = 5656
line = [550, 260, 620, 260]
BLUE_RECT_X_START = 10
BLUE_RECT_X_END = 510
BLUE_RECT_FIXED_HEIGHT = 100

==================== 優化參數 ====================

TRIGGER_LINE_Y = line[1]
TRIGGER_TOLERANCE = 10

==================== 佇列設定 ====================

record_queue = Queue(maxsize=int(FPS * 2))
process_queue = Queue(maxsize=1)
display_queue = Queue(maxsize=1)

======================== 全域變數 ========================

analysis_lock = threading.Lock()
video_writer = None
clients = []
recordIndex = 0
exit_flag = False
recording_flag = False
processing_active = False

These variables below are the shared state that needs protection

wafer_count = 0
tracked_counter = 0
counted_ids = set()
save_counter_green = 1
previous_positions = {}
tracker = None

---

yolo_pin_model = None
model_loaded_event = threading.Event()

==================== 輔助函式 (無變動) ====================

def write_log(message):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")

def clear_directory(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return
write_log(f"清空目錄: {dir_path}...")
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
write_log(f'無法刪除 {file_path}。原因: {e}')

def setup_directories():
for dir_path in [SAVE_DIR_GREEN, RECORDS_DIR, SAVE_DIR_TRACKED]:
if not os.path.exists(dir_path):
os.makedirs(dir_path)
write_log(f"已建立目錄: {dir_path}")

==================== 核心函式 (無變動) ====================

def reset_analysis_state():
global wafer_count, counted_ids, save_counter_green, previous_positions, tracked_counter, tracker
with analysis_lock:
wafer_count = 0
tracked_counter = 0
counted_ids = set()
save_counter_green = 1
previous_positions = {}
tracker = MBTS(max_age=90, min_hits=3, iou_threshold=0.1)
clear_directory(SAVE_DIR_GREEN)
clear_directory(SAVE_DIR_TRACKED)
#clear_directory(RECORDS_DIR) # 通常不清空 Records,除非有特別需求

text
for q in [record_queue, process_queue, display_queue]: while not q.empty(): try: q.get_nowait() except Empty: break write_log("分析狀態、追蹤器與所有佇列已重置。")

==================== 相機與錄影 ====================

def open_camera():
backend = cv2.CAP_DSHOW if platform.system() == "Windows" else cv2.CAP_ANY
cap = cv2.VideoCapture(CAMERA_INDEX, backend)
if not cap.isOpened():
write_log(f"錯誤: 無法開啟攝影機索引 {CAMERA_INDEX}")
return None
cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
cap.set(cv2.CAP_PROP_FPS, FPS)
cap.set(cv2.CAP_PROP_EXPOSURE, EXPOSURE)
cap.set(cv2.CAP_PROP_BRIGHTNESS, BRIGHTNESS)
cap.set(cv2.CAP_PROP_CONTRAST, CONTRAST)
write_log("攝影機參數設定完成")
time.sleep(1)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
write_log(f"成功開啟攝影機。實際解析度: {w}x{h}")
return cap

def camera_capture_thread(cap):
global exit_flag
while not exit_flag:
ret, frame = cap.read()
if not ret:
write_log("無法從相機讀取影像,將停止讀取。")
break
try:
process_queue.get_nowait() # 清掉舊的
except Empty:
pass
try:
process_queue.put_nowait(frame)
except Full:
pass # 忽略,等待下一幀

text
if recording_flag: try: record_queue.put_nowait(frame.copy()) except Full: write_log("警告: 錄影佇列已滿,可能發生掉幀!") cap.release() write_log("相機讀取執行緒已停止。")

def create_video_writer(width, height, cap):
global recordIndex
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
now_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
recordIndex += 1
filename = os.path.join(RECORDS_DIR, f"record_{now_str}_{recordIndex}.mp4")
real_fps = cap.get(cv2.CAP_PROP_FPS)
if real_fps <= 0: # 有些相機會回傳 0
real_fps = 30.0
writer = cv2.VideoWriter(filename, fourcc, real_fps, (width, height))
write_log(f"開始錄影,存檔至: {filename} (FPS={real_fps})")
return writer

<<< FIX 1: 修改函式簽名,加入 cap 引數

def record_thread(cap):
global video_writer, exit_flag
while not exit_flag:
if recording_flag:
try:
frame_to_record = record_queue.get(timeout=1)
if frame_to_record is None: # Termination signal
break
if video_writer is None:
h, w, _ = frame_to_record.shape
# <<< FIX 2: 呼叫 create_video_writer 時傳入 cap
video_writer = create_video_writer(w, h, cap)
video_writer.write(frame_to_record)
except Empty:
continue
else:
if video_writer is not None:
video_writer.release()
video_writer = None
write_log("錄影已停止並存檔。")
time.sleep(0.1)

==================== TCP 通訊 & 模型載入 (無變動) ====================

def handle_client(client_socket, addr):
global recording_flag, processing_active
try:
while not exit_flag:
data = client_socket.recv(1024)
if not data:
break
msg = data.decode().strip().lower()
if msg == "start":
if not model_loaded_event.is_set():
client_socket.sendall("Model is still loading, please wait...\n".encode())
continue
if not processing_active:
reset_analysis_state()
processing_active = True
recording_flag = True
client_socket.sendall("Analysis and recording started.\n".encode())
write_log(f"收到來自 {addr} 的 'start' 命令。")
else:
client_socket.sendall("Already in processing state.\n".encode())
elif msg == "stop":
if processing_active:
write_log(f"收到來自 {addr} 的 'stop' 命令,停止處理...")
processing_active = False
recording_flag = False
client_socket.sendall("Processing stopped. Generating final report...\n".encode())
write_log("開始執行最終分析...")
all_predictions = wafer_seg_640x480_video.analyze_images(model_path=YOLO_SEG_MODEL_PATH, image_folder=SAVE_DIR_GREEN)
write_log(f"分析完成,結果: {all_predictions}")
final_response = f"Result: {','.join(map(str, all_predictions))}\n" if all_predictions is not None else "Result: Error: Analysis failed to produce results.\n"
client_socket.sendall(final_response.encode('utf-8'))
write_log(f"已將分析結果傳送至 {addr}。系統返回待命狀態。")
else:
client_socket.sendall("Not in processing state.\n".encode())
else:
client_socket.sendall(f"Unknown command: {msg}\n".encode())
except (ConnectionResetError, BrokenPipeError):
write_log(f"客戶端 {addr} 強制中斷了連線。")
except Exception as e:
write_log(f"與客戶端 {addr} 通訊時發生錯誤: {e}")
finally:
client_socket.close()
clients.remove(client_socket)
write_log(f"客戶端 {addr} 已斷線。")

def tcp_server_thread():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("0.0.0.0", TCP_PORT))
server_socket.listen(5)
write_log(f"TCP 伺服器已啟動,監聽埠: {TCP_PORT}")
while not exit_flag:
try:
server_socket.settimeout(1.0)
client_socket, addr = server_socket.accept()
clients.append(client_socket)
write_log(f"客戶端 {addr} 已連線。")
thread = threading.Thread(target=handle_client, args=(client_socket, addr), daemon=True)
thread.start()
except socket.timeout:
continue
server_socket.close()
write_log("TCP 伺服器已關閉。")

def load_model_thread():
global yolo_pin_model
try:
write_log(f"開始載入 YOLO 模型: {YOLO_PIN_MODEL_PATH}...")
yolo_pin_model = YOLO(YOLO_PIN_MODEL_PATH)
write_log("YOLO 模型載入成功!")
model_loaded_event.set()
except Exception as e:
write_log(f"載入 YOLO 模型失敗: {e}")

==================== 推論執行緒 (無變動) ====================

def inference_thread():
global exit_flag, wafer_count, tracked_counter, save_counter_green, previous_positions, counted_ids
while not exit_flag:
try:
frame_orig = process_queue.get(timeout=1)
if frame_orig is None:
break
frame = frame_orig.copy()
if processing_active:
tracks = np.empty((0, 5))
trigger_object_cy = None
height, , _ = frame.shape
with analysis_lock:
results = yolo_pin_model(frame, stream=True, verbose=False, conf=0.25)
detections = [[int(x) for x in box.xyxy[0]] + [float(box.conf[0])] for info in results for box in info.boxes]
if tracker is not None:
tracks = tracker.update(np.array(detections) if detections else np.empty((0, 5)))
current_positions_in_frame = {}
if tracks.size > 0:
for track in tracks:
x1, y1, x2, y2, track_id = map(int, track)
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
current_positions_in_frame[track_id] = cy
if track_id in previous_positions:
prev_cy = previous_positions[track_id]
if prev_cy > (TRIGGER_LINE_Y + TRIGGER_TOLERANCE) and cy <= (TRIGGER_LINE_Y + TRIGGER_TOLERANCE) and track_id not in counted_ids:
counted_ids.add(track_id)
wafer_count += 1
write_log(f"物件 ID {track_id} 跨越線計數,總數: {wafer_count}")
trigger_object_cy = cy
if trigger_object_cy is not None:
rect_y1 = max(0, trigger_object_cy - BLUE_RECT_FIXED_HEIGHT // 4 - 10)
rect_y2 = min(height, trigger_object_cy + BLUE_RECT_FIXED_HEIGHT // 4 - 10)
rect_x1, rect_x2 = BLUE_RECT_X_START, BLUE_RECT_X_END
crop = frame_orig[rect_y1:rect_y2, rect_x1:rect_x2]
if crop.size > 0:
save_path = os.path.join(SAVE_DIR_GREEN, f"crop
{save_counter_green:04d}.jpg")
cv2.imwrite(save_path, crop)
save_counter_green += 1
previous_positions = current_positions_in_frame
# --- 繪圖邏輯 ---
cv2.line(frame, (line[0], line[1]), (line[2], line[1]), (0, 255, 0), thickness=2)
cv2.line(frame, (line[0], TRIGGER_LINE_Y + TRIGGER_TOLERANCE), (line[2], TRIGGER_LINE_Y + TRIGGER_TOLERANCE), (0, 100, 0), thickness=1, lineType=cv2.LINE_AA)
if tracks.size > 0:
for track in tracks:
x1, y1, x2, y2, track_id = map(int, track)
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)
cvzone.putTextRect(frame, str(track_id), [x1, cy+5], scale=0.8, thickness=1, colorR=(0, 255, 255), colorT=(255, 0, 0), offset=5)
if trigger_object_cy is not None:
rect_y1 = max(0, trigger_object_cy - BLUE_RECT_FIXED_HEIGHT // 4 - 10)
rect_y2 = min(height, trigger_object_cy + BLUE_RECT_FIXED_HEIGHT // 4 - 10)
cv2.rectangle(frame, (BLUE_RECT_X_START, rect_y1), (BLUE_RECT_X_END, rect_y2), (255, 0, 0), 3)
cvzone.putTextRect(frame, f'Count: {wafer_count}', (320, 50), scale=1, thickness=2, colorT=(0, 0, 0), colorR=(255, 127, 0))
cvzone.putTextRect(frame, "STATUS: PROCESSING", (10, 30), scale=1, thickness=2, colorR=(0,200,0))
if tracks.size > 0:
with analysis_lock:
try:
save_path = os.path.join(SAVE_DIR_TRACKED, f"tracked_{tracked_counter:04d}.jpg")
cv2.imwrite(save_path, frame)
tracked_counter += 1
except Exception as e:
write_log(f"儲存追蹤畫面至 {SAVE_DIR_TRACKED} 失敗: {e}")
else:
if not model_loaded_event.is_set():
cvzone.putTextRect(frame, "STATUS: LOADING MODEL...", (10, 30), scale=1, thickness=2, colorR=(0,165,255))
else:
cvzone.putTextRect(frame, "STATUS: WAITING FOR START COMMAND", (10, 30), scale=1, thickness=2, colorR=(0,165,255))
try:
display_queue.get_nowait()
except Empty:
pass
try:
display_queue.put_nowait(frame)
except Full:
pass
except Empty:
time.sleep(0.01)
except Exception as e:
write_log(f"!!! 推論執行緒發生嚴重錯誤: {e}")
traceback.print_exc()
time.sleep(1)

==================== 主程式 ====================

def main():
global exit_flag
setup_directories()
reset_analysis_state()

text
cap = open_camera() if cap is None: exit_flag = True sys.exit() # <<< FIX 3: 在 cap 物件建立後,再建立需要它的執行緒 all_threads = [ threading.Thread(target=load_model_thread, daemon=True), threading.Thread(target=tcp_server_thread, daemon=True), threading.Thread(target=record_thread, args=(cap,), daemon=True), # 將 cap 傳入 threading.Thread(target=camera_capture_thread, args=(cap,), daemon=True, name="camera_capture_thread"), threading.Thread(target=inference_thread, daemon=True, name="inference_thread") ] for t in all_threads: t.start() prev_time = 0 write_log("程式已啟動。等待模型載入完成和 'start' 命令...") while not exit_flag: try: display_frame = display_queue.get(timeout=1) if display_frame is None: break curr_time = time.time() if prev_time > 0: fps = 1 / (curr_time - prev_time) cv2.putText(display_frame, f"Display FPS: {int(fps)}", (CAMERA_WIDTH - 200, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) prev_time = curr_time cv2.imshow('Wafer Detection Service', display_frame) except Empty: inference_is_alive = any(t.name == 'inference_thread' and t.is_alive() for t in all_threads) if not inference_is_alive: write_log("錯誤:推論執行緒已意外終止!請檢查日誌。程式即將退出。") exit_flag = True else: write_log("警告: 顯示佇列長時間無影像,但推論執行緒仍在運行。") continue key = cv2.waitKey(1) & 0xFF if key == ord('q') or cv2.getWindowProperty('Wafer Detection Service', cv2.WND_PROP_VISIBLE) < 1: write_log("手動退出程式...") exit_flag = True exit_flag = True for q in [record_queue, process_queue, display_queue]: try: q.put_nowait(None) except Full: pass for t in all_threads: t.join(timeout=2.0) cv2.destroyAllWindows() write_log("主程式迴圈結束,資源已釋放。")

if name == "main":
main()
write_log("主程式執行完畢。")

上面的程式為什麼實際錄影的時間有60 秒,但是看 mp4撥放的時間只有 30 秒 ?

Pensamento

Investigating video duration issue

It looks like the video is playing at half the expected duration because of a mismatch between the capture and write frame rates. The capture might be at 30fps, even though the writer is set to 60fps. If the writer's frame rate exceeds the capture rate, the video will play faster.

Identifying video frame issue

It seems like the writer is set to 60fps, while the actual captured rate is 30fps. This results in halved video duration, as frames are written at twice the expected rate. To fix it, the best approach could be to measure the actual capture rate and adjust the writer accordingly, or switch to time-based writing.

Diagnosing frame rate issues

The user's video might have lower duration due to frame drops from a full record queue, which silently reduces frame count. The frame rate mismatch between capture and writer, along with potential exposure settings, contributes. The recommended solution is logging actual frame rates with timestamps and using a rolling FPS calculation for accuracy.

Improving video frame handling

To prevent skipped frames, I need to manage the frame rate carefully. One option is to set the writer FPS to the measured FPS, using timestamps. Alternatively, I could block on the record queue or adjust the thread handling for smoother frame writing. Tracking timestamps for each frame might also help.

Thought for 45s

Resposta

短答案:因為你寫進 MP4 的「名義 FPS」跟實際寫出的畫格速率不一致,再加上你在錄影佇列滿載時會「悄悄丟幀」。播放器只看檔案標的 FPS 來算時長,結果就把 60 秒的實際拍攝,播成 30 秒。


發生什麼事?

  1. VideoWriter 的 FPS 可能設太高
    你用這段來決定輸出 FPS:
python
real_fps = cap.get(cv2.CAP_PROP_FPS) writer = cv2.VideoWriter(filename, fourcc, real_fps, (width, height))

實務上很多相機/後端(尤其是 Windows)會在你 cap.set(FPS=60) 後「回報 60」,但實際只給得到 ~30fps
→ 結果:你 60 秒內只寫到 ~1800 張幀,但檔頭寫成 60 fps,播放器就算出時長 1800/60 = 30 秒。

  1. 你在錄影佇列滿時會丟幀
python
if recording_flag: try: record_queue.put_nowait(frame.copy()) except Full: write_log("警告: 錄影佇列已滿,可能發生掉幀!")

這裡不阻塞、直接丟棄幀。幀數再度下降 → 實際幀數 < 檔頭 FPS → 播放時間更短。

以上兩點同時存在時,就很容易出現「錄 60 秒,檔案只有 30 秒」的現象。


快速修正(最小改動)

A. 用「量測到的實際 FPS」來開 VideoWriter

在開始寫檔前,先用時間戳測 1–2 秒的平均幀率,用測到的值當作 VideoWriter 的 fps(不要信 cap.get):

python
def measure_capture_fps(cap, sample_seconds=2.0): import time frames = 0 start = time.perf_counter() while time.perf_counter() - start < sample_seconds: ret, _ = cap.read() if not ret: break frames += 1 elapsed = time.perf_counter() - start return frames / elapsed if elapsed > 0 else 30.0

create_video_writer 改成用上面的量測(或把量測結果存在外部變數傳入):

python
def create_video_writer(width, height, fps_for_file): fourcc = cv2.VideoWriter_fourcc(*'mp4v') now_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(RECORDS_DIR, f"record_{now_str}_{recordIndex}.mp4") writer = cv2.VideoWriter(filename, fourcc, fps_for_file, (width, height)) write_log(f"開始錄影,存檔至: {filename} (FPS={fps_for_file})") return writer

並在主程式打開相機後、開啟執行緒前量測一次,存成 measured_fps,再傳進 record_thread 用來建立 writer。

B. 錄影時不要丟幀(至少在錄影這條管線不要)

record_queue.put_nowait(...) 換成會等put(...),或給個合理 timeout,避免靜默丟幀:

python
# camera_capture_thread 內 if recording_flag: try: record_queue.put(frame.copy(), timeout=0.5) # 允許稍等,避免丟幀 except Full: write_log("警告: 錄影佇列擁塞(已避丟幀盡力)")

另外把 record_queue 大一點點(例如 FPS * 4)也有幫助,但真正的關鍵是不要用 non-blocking 的 put_nowait


進階建議(更穩)

  • 以寫檔能力為準同步節奏:在 record_thread 裡每寫一幀就記錄時間,若寫不及,可在 camera_capture_thread 降低擷取率(cv2.CAP_PROP_FPS 不一定可靠;可改成每隔固定時間取一幀),確保供給 ≈ 寫出能力。
  • 改用能寫變動時間戳(VFR)的方案:OpenCV VideoWriter 只吃「固定 FPS」。若要完全準確的時間軸,改用 ffmpeg 串流、輸入每幀 PTS,或先存原始幀+timestamps,錄完用 ffmpeg 以 timestamps 重建影片。
  • 確認相機真的支援 640×480@60:不少攝影機在該解析度只有 30fps。用廠商工具或 Media Foundation/DirectShow 檢視支援模式;如果只到 30,就把全流程鎖在 30fps,避免錯配。
  • 不要相信 cap.get(CAP_PROP_FPS):把它當參考即可,實際用量測結果。

為什麼不是別的原因?

  • 露出「一半時長」幾乎就是幀數 / 標示 FPS的典型問題。若是編碼器寫檔中途失敗或時間基底壞掉,常見現象是播放器顯示時長正常但播放卡頓整檔無法播放,跟你遇到的症狀不同。

小範例:把量測 FPS 串起來(示意片段)

python
# main() 打開相機後: measured_fps = measure_capture_fps(cap, sample_seconds=1.5) write_log(f"實測擷取 FPS ≈ {measured_fps:.2f}") # record_thread 裡,建立 writer 時用 measured_fps def record_thread(cap, measured_fps): global video_writer, exit_flag while not exit_flag: if recording_flag: try: frame_to_record = record_queue.get(timeout=1) if frame_to_record is None: break if video_writer is None: h, w, _ = frame_to_record.shape video_writer = create_video_writer(w, h, measured_fps) video_writer.write(frame_to_record) except Empty: continue else: if video_writer is not None: video_writer.release() video_writer = None write_log("錄影已停止並存檔。") time.sleep(0.1)

註:你現在已把 record_thread 正確地接到 cap;再多傳一個 measured_fps 進去即可。


如果你要,我也可以幫你把上述改動直接套進你的檔案,或幫你加上即時 FPS/丟幀率的 log,方便你驗證修正後的效果。

Pergunta

請給出完整的程式

Pensamento

Providing full code with adjustments

User wants the full program with fixes. I'll provide the full updated script but be cautious, as I can't run code. I'll ensure it meets constraints: using a measured FPS, a blocking queue for recording, and larger queue size. Functions and imports will stay consistent, and modifications will be marked. I'll also adjust for thread setup and frame handling.

Updating the program flow

Okay, I need to measure FPS before starting the threads. I'll measure by temporarily opening the camera, then reset it afterward. This helps avoid interfering with the capture process and ensures we only use the frame grab after measurement. I'll update the queue sizes, pass the measured FPS to the record thread, and ensure the filename logic and client handling stay unchanged.

Thought for 10s

Resposta

沒問題!以下是「完整」的修正版程式 —— 已修正造成影片長度減半的關鍵點:

  • 實測擷取 FPS(而非 cap.get(CAP_PROP_FPS))建立 VideoWriter,避免「名義 FPS ≠ 實際寫出幀率」。
  • 錄影佇列改為阻塞 put(含 timeout),避免靜默丟幀導致實際幀數不足。
  • 適度放大錄影佇列容量(FPS * 4)。

直接把下列程式整段覆蓋你的檔即可:

python
# 檔名: wafer_wafer_640x480_mp4_queue_arch_fixed_v3.py import cv2 import numpy as np import cvzone import time import os import sys import shutil import platform import datetime import threading import socket from queue import Queue, Empty, Full import traceback # Import for detailed error logging from ultralytics import YOLO from MBTS import MBTS import wafer_seg_640x480_video # ==================== 程式設定 ==================== YOLO_PIN_MODEL_PATH = 'best_slotIR_640x480.pt' YOLO_SEG_MODEL_PATH = 'best_result_640x480.pt' CLASSES_FILE = 'classes1.txt' SAVE_DIR_TRACKED = "RED_circuit" SAVE_DIR_GREEN = "Green_square" RECORDS_DIR = "Records" CAMERA_INDEX = 0 CAMERA_WIDTH = 640 CAMERA_HEIGHT = 480 FPS = 60.0 EXPOSURE = -7 BRIGHTNESS = 40.0 CONTRAST = 32.0 TCP_PORT = 5656 line = [550, 260, 620, 260] BLUE_RECT_X_START = 10 BLUE_RECT_X_END = 510 BLUE_RECT_FIXED_HEIGHT = 100 # ==================== 優化參數 ==================== TRIGGER_LINE_Y = line[1] TRIGGER_TOLERANCE = 10 # ==================== 佇列設定 ==================== # 放大錄影佇列避免瞬間擁塞;顯示/處理仍維持單幀覆蓋語意 record_queue = Queue(maxsize=int(FPS * 4)) process_queue = Queue(maxsize=1) display_queue = Queue(maxsize=1) # ======================== 全域變數 ======================== analysis_lock = threading.Lock() video_writer = None clients = [] recordIndex = 0 exit_flag = False recording_flag = False processing_active = False # 這些共享狀態需要保護 wafer_count = 0 tracked_counter = 0 counted_ids = set() save_counter_green = 1 previous_positions = {} tracker = None # --- yolo_pin_model = None model_loaded_event = threading.Event() # ==================== 輔助函式 ==================== def write_log(message): timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {message}") def clear_directory(dir_path): if not os.path.exists(dir_path): os.makedirs(dir_path) return write_log(f"清空目錄: {dir_path}...") for filename in os.listdir(dir_path): file_path = os.path.join(dir_path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: write_log(f'無法刪除 {file_path}。原因: {e}') def setup_directories(): for dir_path in [SAVE_DIR_GREEN, RECORDS_DIR, SAVE_DIR_TRACKED]: if not os.path.exists(dir_path): os.makedirs(dir_path) write_log(f"已建立目錄: {dir_path}") def measure_capture_fps(cap, sample_seconds=1.5): """ 實測相機擷取幀率,避免使用 cap.get(CAP_PROP_FPS) 造成名義 FPS 與實際不符。 在啟動任何讀取執行緒之前呼叫。 """ frames = 0 start = time.perf_counter() while time.perf_counter() - start < sample_seconds: ret, _ = cap.read() if not ret: break frames += 1 elapsed = time.perf_counter() - start measured = frames / elapsed if elapsed > 0 else 30.0 return measured # ==================== 核心函式 ==================== def reset_analysis_state(): global wafer_count, counted_ids, save_counter_green, previous_positions, tracked_counter, tracker with analysis_lock: wafer_count = 0 tracked_counter = 0 counted_ids = set() save_counter_green = 1 previous_positions = {} tracker = MBTS(max_age=90, min_hits=3, iou_threshold=0.1) clear_directory(SAVE_DIR_GREEN) clear_directory(SAVE_DIR_TRACKED) #clear_directory(RECORDS_DIR) # 通常不清空 Records,除非有特別需求 for q in [record_queue, process_queue, display_queue]: while not q.empty(): try: q.get_nowait() except Empty: break write_log("分析狀態、追蹤器與所有佇列已重置。") # ==================== 相機與錄影 ==================== def open_camera(): backend = cv2.CAP_DSHOW if platform.system() == "Windows" else cv2.CAP_ANY cap = cv2.VideoCapture(CAMERA_INDEX, backend) if not cap.isOpened(): write_log(f"錯誤: 無法開啟攝影機索引 {CAMERA_INDEX}") return None cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT) cap.set(cv2.CAP_PROP_FPS, FPS) cap.set(cv2.CAP_PROP_EXPOSURE, EXPOSURE) cap.set(cv2.CAP_PROP_BRIGHTNESS, BRIGHTNESS) cap.set(cv2.CAP_PROP_CONTRAST, CONTRAST) write_log("攝影機參數設定完成") time.sleep(1) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) write_log(f"成功開啟攝影機。實際解析度: {w}x{h}") return cap def camera_capture_thread(cap): global exit_flag while not exit_flag: ret, frame = cap.read() if not ret: write_log("無法從相機讀取影像,將停止讀取。") break # 供推論的最新幀 — 覆蓋舊值 try: process_queue.get_nowait() except Empty: pass try: process_queue.put_nowait(frame) except Full: pass # 忽略,等待下一幀 # 供錄影:避免靜默丟幀,改為阻塞 put,並給 timeout if recording_flag: try: record_queue.put(frame.copy(), timeout=0.5) except Full: write_log("警告: 錄影佇列擁塞(已盡力避免丟幀)") cap.release() write_log("相機讀取執行緒已停止。") def create_video_writer(width, height, fps_for_file): global recordIndex fourcc = cv2.VideoWriter_fourcc(*'mp4v') now_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") recordIndex += 1 filename = os.path.join(RECORDS_DIR, f"record_{now_str}_{recordIndex}.mp4") writer = cv2.VideoWriter(filename, fourcc, fps_for_file, (width, height)) write_log(f"開始錄影,存檔至: {filename} (FPS_for_file={fps_for_file:.2f})") return writer def record_thread(fps_for_file): """ 錄影執行緒:以「實測 FPS」建立 VideoWriter,避免影片長度被壓縮。 """ global video_writer, exit_flag while not exit_flag: if recording_flag: try: frame_to_record = record_queue.get(timeout=1) if frame_to_record is None: # Termination signal break if video_writer is None: h, w, _ = frame_to_record.shape video_writer = create_video_writer(w, h, fps_for_file) video_writer.write(frame_to_record) except Empty: continue else: if video_writer is not None: video_writer.release() video_writer = None write_log("錄影已停止並存檔。") time.sleep(0.1) # ==================== TCP 通訊 & 模型載入 ==================== def handle_client(client_socket, addr): global recording_flag, processing_active try: while not exit_flag: data = client_socket.recv(1024) if not data: break msg = data.decode().strip().lower() if msg == "start": if not model_loaded_event.is_set(): client_socket.sendall("Model is still loading, please wait...\n".encode()) continue if not processing_active: reset_analysis_state() processing_active = True recording_flag = True client_socket.sendall("Analysis and recording started.\n".encode()) write_log(f"收到來自 {addr} 的 'start' 命令。") else: client_socket.sendall("Already in processing state.\n".encode()) elif msg == "stop": if processing_active: write_log(f"收到來自 {addr} 的 'stop' 命令,停止處理...") processing_active = False recording_flag = False client_socket.sendall("Processing stopped. Generating final report...\n".encode()) write_log("開始執行最終分析...") all_predictions = wafer_seg_640x480_video.analyze_images(model_path=YOLO_SEG_MODEL_PATH, image_folder=SAVE_DIR_GREEN) write_log(f"分析完成,結果: {all_predictions}") final_response = f"Result: {','.join(map(str, all_predictions))}\n" if all_predictions is not None else "Result: Error: Analysis failed to produce results.\n" client_socket.sendall(final_response.encode('utf-8')) write_log(f"已將分析結果傳送至 {addr}。系統返回待命狀態。") else: client_socket.sendall("Not in processing state.\n".encode()) else: client_socket.sendall(f"Unknown command: {msg}\n".encode()) except (ConnectionResetError, BrokenPipeError): write_log(f"客戶端 {addr} 強制中斷了連線。") except Exception as e: write_log(f"與客戶端 {addr} 通訊時發生錯誤: {e}") finally: client_socket.close() if client_socket in clients: clients.remove(client_socket) write_log(f"客戶端 {addr} 已斷線。") def tcp_server_thread(): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(("0.0.0.0", TCP_PORT)) server_socket.listen(5) write_log(f"TCP 伺服器已啟動,監聽埠: {TCP_PORT}") while not exit_flag: try: server_socket.settimeout(1.0) client_socket, addr = server_socket.accept() clients.append(client_socket) write_log(f"客戶端 {addr} 已連線。") thread = threading.Thread(target=handle_client, args=(client_socket, addr), daemon=True) thread.start() except socket.timeout: continue server_socket.close() write_log("TCP 伺服器已關閉。") def load_model_thread(): global yolo_pin_model try: write_log(f"開始載入 YOLO 模型: {YOLO_PIN_MODEL_PATH}...") yolo_pin_model = YOLO(YOLO_PIN_MODEL_PATH) write_log("YOLO 模型載入成功!") model_loaded_event.set() except Exception as e: write_log(f"載入 YOLO 模型失敗: {e}") # ==================== 推論執行緒 ==================== def inference_thread(): global exit_flag, wafer_count, tracked_counter, save_counter_green, previous_positions, counted_ids while not exit_flag: try: frame_orig = process_queue.get(timeout=1) if frame_orig is None: break frame = frame_orig.copy() if processing_active: tracks = np.empty((0, 5)) trigger_object_cy = None height, _, _ = frame.shape with analysis_lock: results = yolo_pin_model(frame, stream=True, verbose=False, conf=0.25) detections = [[int(x) for x in box.xyxy[0]] + [float(box.conf[0])] for info in results for box in info.boxes] if tracker is not None: tracks = tracker.update(np.array(detections) if detections else np.empty((0, 5))) current_positions_in_frame = {} if tracks.size > 0: for track in tracks: x1, y1, x2, y2, track_id = map(int, track) cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 current_positions_in_frame[track_id] = cy if track_id in previous_positions: prev_cy = previous_positions[track_id] if prev_cy > (TRIGGER_LINE_Y + TRIGGER_TOLERANCE) and cy <= (TRIGGER_LINE_Y + TRIGGER_TOLERANCE) and track_id not in counted_ids: counted_ids.add(track_id) wafer_count += 1 write_log(f"物件 ID {track_id} 跨越線計數,總數: {wafer_count}") trigger_object_cy = cy if trigger_object_cy is not None: rect_y1 = max(0, trigger_object_cy - BLUE_RECT_FIXED_HEIGHT // 4 - 10) rect_y2 = min(height, trigger_object_cy + BLUE_RECT_FIXED_HEIGHT // 4 - 10) rect_x1, rect_x2 = BLUE_RECT_X_START, BLUE_RECT_X_END crop = frame_orig[rect_y1:rect_y2, rect_x1:rect_x2] if crop.size > 0: save_path = os.path.join(SAVE_DIR_GREEN, f"crop_{save_counter_green:04d}.jpg") cv2.imwrite(save_path, crop) save_counter_green += 1 previous_positions = current_positions_in_frame # --- 繪圖邏輯 --- cv2.line(frame, (line[0], line[1]), (line[2], line[1]), (0, 255, 0), thickness=2) cv2.line(frame, (line[0], TRIGGER_LINE_Y + TRIGGER_TOLERANCE), (line[2], TRIGGER_LINE_Y + TRIGGER_TOLERANCE), (0, 100, 0), thickness=1, lineType=cv2.LINE_AA) if tracks.size > 0: for track in tracks: x1, y1, x2, y2, track_id = map(int, track) cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 255), 2) cvzone.putTextRect(frame, str(track_id), [x1, cy+5], scale=0.8, thickness=1, colorR=(0, 255, 255), colorT=(255, 0, 0), offset=5) if trigger_object_cy is not None: rect_y1 = max(0, trigger_object_cy - BLUE_RECT_FIXED_HEIGHT // 4 - 10) rect_y2 = min(height, trigger_object_cy + BLUE_RECT_FIXED_HEIGHT // 4 - 10) cv2.rectangle(frame, (BLUE_RECT_X_START, rect_y1), (BLUE_RECT_X_END, rect_y2), (255, 0, 0), 3) cvzone.putTextRect(frame, f'Count: {wafer_count}', (320, 50), scale=1, thickness=2, colorT=(0, 0, 0), colorR=(255, 127, 0)) cvzone.putTextRect(frame, "STATUS: PROCESSING", (10, 30), scale=1, thickness=2, colorR=(0,200,0)) if tracks.size > 0: with analysis_lock: try: save_path = os.path.join(SAVE_DIR_TRACKED, f"tracked_{tracked_counter:04d}.jpg") cv2.imwrite(save_path, frame) tracked_counter += 1 except Exception as e: write_log(f"儲存追蹤畫面至 {SAVE_DIR_TRACKED} 失敗: {e}") else: if not model_loaded_event.is_set(): cvzone.putTextRect(frame, "STATUS: LOADING MODEL...", (10, 30), scale=1, thickness=2, colorR=(0,165,255)) else: cvzone.putTextRect(frame, "STATUS: WAITING FOR START COMMAND", (10, 30), scale=1, thickness=2, colorR=(0,165,255)) try: display_queue.get_nowait() except Empty: pass try: display_queue.put_nowait(frame) except Full: pass except Empty: time.sleep(0.01) except Exception as e: write_log(f"!!! 推論執行緒發生嚴重錯誤: {e}") traceback.print_exc() time.sleep(1) # ==================== 主程式 ==================== def main(): global exit_flag setup_directories() reset_analysis_state() cap = open_camera() if cap is None: exit_flag = True sys.exit() # 重要:啟動任何讀取執行緒前,先「實測」擷取 FPS,並用該數值建立 VideoWriter measured_fps = measure_capture_fps(cap, sample_seconds=1.5) write_log(f"實測擷取 FPS ≈ {measured_fps:.2f}") # 建立執行緒 all_threads = [ threading.Thread(target=load_model_thread, daemon=True), threading.Thread(target=tcp_server_thread, daemon=True), threading.Thread(target=record_thread, args=(measured_fps,), daemon=True), # 以實測 FPS 寫檔 threading.Thread(target=camera_capture_thread, args=(cap,), daemon=True, name="camera_capture_thread"), threading.Thread(target=inference_thread, daemon=True, name="inference_thread") ] for t in all_threads: t.start() prev_time = 0 write_log("程式已啟動。等待模型載入完成和 'start' 命令...") while not exit_flag: try: display_frame = display_queue.get(timeout=1) if display_frame is None: break curr_time = time.time() if prev_time > 0: fps = 1 / (curr_time - prev_time) cv2.putText(display_frame, f"Display FPS: {int(fps)}", (CAMERA_WIDTH - 200, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) prev_time = curr_time cv2.imshow('Wafer Detection Service', display_frame) except Empty: inference_is_alive = any(t.name == 'inference_thread' and t.is_alive() for t in all_threads) if not inference_is_alive: write_log("錯誤:推論執行緒已意外終止!請檢查日誌。程式即將退出。") exit_flag = True else: write_log("警告: 顯示佇列長時間無影像,但推論執行緒仍在運行。") continue key = cv2.waitKey(1) & 0xFF if key == ord('q') or cv2.getWindowProperty('Wafer Detection Service', cv2.WND_PROP_VISIBLE) < 1: write_log("手動退出程式...") exit_flag = True exit_flag = True for q in [record_queue, process_queue, display_queue]: try: q.put_nowait(None) except Full: pass for t in all_threads: t.join(timeout=2.0) cv2.destroyAllWindows() write_log("主程式迴圈結束,資源已釋放。") if __name__ == "__main__": main() write_log("主程式執行完畢。")

變更重點回顧

  • 新增 measure_capture_fps(),在 main() 開始前 1.5 秒量測實際擷取幀率,並把該值傳給 record_thread
  • create_video_writer() 使用 fps_for_file(實測值)建立 MP4,確保檔頭 FPS 與實際寫入幀率一致。
  • camera_capture_thread() 對錄影佇列改為 record_queue.put(..., timeout=0.5),避免靜默丟幀。
  • 擴大 record_queue 容量為 FPS * 4,減少短暫尖峰造成的擁塞。

如果你也想同時記錄「實際寫入幀數」與「預期幀數」來估算掉幀率,我可以再幫你加上簡單的統計 log。

Compartilhar esta Q&A